diff options
author | Eric Wong <e@80x24.org> | 2016-12-31 11:16:47 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2017-01-07 23:38:29 +0000 |
commit | 68f83ca5236078128a0ccf47a1e54bd955777120 (patch) | |
tree | f60805615cb69e97c7e79182d2b991f67f900210 | |
parent | 0753326bc4f148436b4c34dfb8133b5579de0198 (diff) | |
download | public-inbox-68f83ca5236078128a0ccf47a1e54bd955777120.tar.gz |
This will allow us to handle network operations while waiting on "git cat-file" to seek and unpack things.
-rw-r--r-- | MANIFEST | 3 | ||||
-rw-r--r-- | lib/PublicInbox/Git.pm | 89 | ||||
-rw-r--r-- | lib/PublicInbox/GitAsyncRd.pm | 133 | ||||
-rw-r--r-- | lib/PublicInbox/GitAsyncWr.pm | 23 | ||||
-rw-r--r-- | t/git_async.t | 138 |
5 files changed, 386 insertions, 0 deletions
@@ -55,6 +55,8 @@ lib/PublicInbox/Filter/Mirror.pm lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/GetlineBody.pm lib/PublicInbox/Git.pm +lib/PublicInbox/GitAsyncRd.pm +lib/PublicInbox/GitAsyncWr.pm lib/PublicInbox/GitHTTPBackend.pm lib/PublicInbox/HTTP.pm lib/PublicInbox/HTTPD.pm @@ -151,6 +153,7 @@ t/git-http-backend.psgi t/git-http-backend.t t/git.fast-import-data t/git.t +t/git_async.t t/html_index.t t/httpd-corner.psgi t/httpd-corner.t diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 4dfc4099..0b482c7e 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -13,6 +13,10 @@ use POSIX qw(dup2); require IO::Handle; use PublicInbox::Spawn qw(spawn popen_rd); use Fcntl qw(:seek); +my $have_async = eval { + require PublicInbox::EvCleanup; + require PublicInbox::GitAsyncRd; +}; # Documentation/SubmittingPatches recommends 12 (Linux v4.4) my $abbrev = `git config core.abbrev` || 12; @@ -64,6 +68,7 @@ sub _bidi_pipe { $self->{$in} = $in_r; } +# legacy synchronous API sub cat_file_begin { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch in out pid)); @@ -79,6 +84,7 @@ sub cat_file_begin { ($in, $1, $2, $3); } +# legacy synchronous API sub cat_file_finish { my ($self, $left) = @_; my $max = 8192; @@ -96,6 +102,7 @@ sub cat_file_finish { fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); } +# legacy synchronous API sub cat_file { my ($self, $obj, $ref) = @_; @@ -131,6 +138,7 @@ sub cat_file { sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) } +# legacy synchronous API sub check { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); @@ -185,10 +193,91 @@ sub cleanup { my ($self) = @_; _destroy($self, qw(in out pid)); _destroy($self, qw(in_c out_c pid_c)); + + if ($have_async) { + my %h = %$self; # yup, copy ourselves + %$self = (); + my $ds_closed; + + # schedule closing with Danga::Socket::close: + foreach (qw(async async_c)) { + my $ds = delete $h{$_} or next; + $ds->close; + $ds_closed = 1; + } + + # can't do waitpid in _destroy() until next tick, + # since D::S defers closing until end of current event loop + $ds_closed and PublicInbox::EvCleanup::next_tick(sub { + _destroy(\%h, qw(in_a out_a pid_a)); + _destroy(\%h, qw(in_ac out_ac pid_ac)); + }); + } } sub DESTROY { cleanup(@_) } +# modern async API +sub check_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async_c} ||= do { + _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac)); + PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1) + })->cat_file_async($obj, $cb); +} + +sub cat_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async} ||= do { + _bidi_pipe($self, qw(--batch in_a out_a pid_a)); + PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a}); + })->cat_file_async($obj, $cb); +} + +sub async_info_compat ($) { + local $/ = "\n"; + chomp(my $line = $_[0]->getline); + [ split(/ /, $line) ]; +} + +sub check_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); + $self->{out_c}->print($obj."\n") or fail($self, "write error: $!"); + my $info = async_info_compat($self->{in_c}); + $cb->($info); +} + +sub cat_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch in out pid)); + $self->{out}->print($obj."\n") or fail($self, "write error: $!"); + my $in = $self->{in}; + my $info = async_info_compat($in); + $cb->($info); + return if scalar(@$info) != 3; # missing + my $max = 8192; + my $left = $info->[2]; + my ($buf, $r); + while ($left > 0) { + $r = read($in, $buf, $left > $max ? $max : $left); + return $cb->($r) unless $r; # undef or 0 + $left -= $r; + $cb->(\$buf); + } + $r = read($in, $buf, 1); + defined($r) or fail($self, "read failed: $!"); + fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); +} + +if ($have_async) { + *check_async = *check_async_ds; + *cat_async = *cat_async_ds; +} else { + *check_async = *check_async_compat; + *cat_async = *cat_async_compat; +} + 1; __END__ =pod diff --git a/lib/PublicInbox/GitAsyncRd.pm b/lib/PublicInbox/GitAsyncRd.pm new file mode 100644 index 00000000..a56dc392 --- /dev/null +++ b/lib/PublicInbox/GitAsyncRd.pm @@ -0,0 +1,133 @@ +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# +# internal class used by PublicInbox::Git + Danga::Socket +# This parses the output pipe of "git cat-file --batch/--batch-check" +package PublicInbox::GitAsyncRd; +use strict; +use warnings; +use base qw(Danga::Socket); +use fields qw(jobq rbuf wr check); +use PublicInbox::GitAsyncWr; +our $MAX = 65536; # Import may bump this in the future + +sub new { + my ($class, $rd, $wr, $check) = @_; + my $self = fields::new($class); + IO::Handle::blocking($rd, 0); + $self->SUPER::new($rd); + $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ] + my $buf = ''; + $self->{rbuf} = \$buf; + $self->{wr} = PublicInbox::GitAsyncWr->new($wr); + $self->{check} = $check; + $self->watch_read(1); + $self; +} + +sub cat_file_async { + my ($self, $obj, $cb) = @_; + # order matters + push @{$self->{jobq}}, [ $obj, $cb ]; + $self->{wr}->write($obj."\n"); +} + +# Returns: an array ref of the info line for --batch-check and --batch, +# which may be: [ $obj, 'missing'] +# Returns undef on error +sub read_info ($) { + my ($self) = @_; + my $rbuf = $self->{rbuf}; + my $rd = $self->{sock}; + + while (1) { + $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ]; + + my $r = sysread($rd, $$rbuf, 110, length($$rbuf)); + next if $r; + return $r; + } +} + +sub event_read { + my ($self) = @_; + my $jobq = $self->{jobq}; + my ($cur, $obj, $cb, $info, $left); + my $check = $self->{check}; + my ($rbuf, $rlen, $need, $buf); +take_job: + $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__; + ($obj, $cb, $info, $left) = @$cur; + if (!$info) { + $info = read_info($self); + if (!defined $info && ($!{EAGAIN} || $!{EINTR})) { + return unshift(@$jobq, $cur) + } + $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!) + return $self->close unless $info; + if ($check || (scalar(@$info) != 3)) { + # do not monopolize the event loop if we're drained: + return if ${$self->{rbuf}} eq ''; + goto take_job; + } + $cur->[2] = $info; + my $len = $info->[2]; + $left = \$len; + $cur->[3] = $left; # onto reading body... + } + ref($left) or die 'BUG: $left not ref in '.__PACKAGE__; + + $rbuf = $self->{rbuf}; + $rlen = length($$rbuf); + $need = $$left + 1; # +1 for trailing LF + $buf = ''; + + if ($rlen == $need) { +final_hunk: + $self->{rbuf} = \$buf; + $$left = undef; + my $lf = chop $$rbuf; + $lf eq "\n" or die "BUG: missing LF (got $lf)"; + $cb->($rbuf); + + return if $buf eq ''; + goto take_job; + } elsif ($rlen < $need) { + my $all = $need - $rlen; + my $n = $all > $MAX ? $MAX : $all; + my $r = sysread($self->{sock}, $$rbuf, $n, $rlen); + if ($r) { + goto final_hunk if $r == $all; + + # more to read later... + $$left -= $r; + $self->{rbuf} = \$buf; + $cb->($rbuf); + + # don't monopolize the event loop + return unshift(@$jobq, $cur); + } elsif (!defined $r) { + return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR}; + } + $cb->($r); # $cb should handle 0 and undef (and see $!) + $self->close; # FAIL... + } else { # too much data in rbuf + $buf = substr($$rbuf, $need, $rlen - $need); + $$rbuf = substr($$rbuf, 0, $need); + goto final_hunk; + } +} + +sub close { + my $self = shift; + my $jobq = $self->{jobq}; + $self->{jobq} = []; + $_->[1]->(0) for @$jobq; + $self->{wr}->close; + $self->SUPER::close(@_); +} + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; diff --git a/lib/PublicInbox/GitAsyncWr.pm b/lib/PublicInbox/GitAsyncWr.pm new file mode 100644 index 00000000..c22f2fcc --- /dev/null +++ b/lib/PublicInbox/GitAsyncWr.pm @@ -0,0 +1,23 @@ +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# +# internal class used by PublicInbox::Git + Danga::Socket +# This writes to the input pipe of "git cat-file --batch/--batch-check" +package PublicInbox::GitAsyncWr; +use strict; +use warnings; +use base qw(Danga::Socket); + +sub new { + my ($class, $io) = @_; + my $self = fields::new($class); + IO::Handle::blocking($io, 0); + $self->SUPER::new($io); +} + +# we only care about write + event_write + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; diff --git a/t/git_async.t b/t/git_async.t new file mode 100644 index 00000000..c20d48e3 --- /dev/null +++ b/t/git_async.t @@ -0,0 +1,138 @@ +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use warnings; +use Test::More; +$SIG{PIPE} = 'IGNORE'; +foreach my $mod (qw(Danga::Socket)) { + eval "require $mod"; + plan skip_all => "$mod missing for git_async.t" if $@; +} +use File::Temp qw/tempdir/; +use Cwd qw/getcwd/; +my $tmpdir = tempdir('git_async-XXXXXX', TMPDIR => 1, CLEANUP => 1); +use_ok 'PublicInbox::Git'; +my $dir = "$tmpdir/git.git"; +{ + is(system(qw(git init -q --bare), $dir), 0, 'created git directory'); + my @cmd = ('git', "--git-dir=$dir", 'fast-import', '--quiet'); + my $fi_data = getcwd().'/t/git.fast-import-data'; + ok(-r $fi_data, "fast-import data readable (or run test at top level)"); + my $pid = fork; + defined $pid or die "fork failed: $!\n"; + if ($pid == 0) { + open STDIN, '<', $fi_data or die "open $fi_data: $!\n"; + exec @cmd; + die "failed exec: ",join(' ', @cmd),": $!\n"; + } + waitpid $pid, 0; + is($?, 0, 'fast-import succeeded'); +} + +{ + my $f = 'HEAD:foo.txt'; + my @args; + my $n = 0; + my $git = PublicInbox::Git->new($dir); + Danga::Socket->SetPostLoopCallback(sub { + my ($fdmap) = @_; + foreach (values %$fdmap) { + return 1 if ref($_) =~ /::GitAsync/; + } + 0 + }); + $git->check_async($f, sub { + $n++; + @args = @_; + $git = undef; + }); + Danga::Socket->EventLoop; + my @exp = PublicInbox::Git->new($dir)->check($f); + my $exp = [ \@exp ]; + is_deeply(\@args, $exp, 'matches regular check'); + is($n, 1, 'callback only called once'); + $git = PublicInbox::Git->new($dir); + $n = 0; + my $max = 100; + my $missing = 'm'; + my $m = 0; + for my $i (0..$max) { + my $k = "HEAD:m$i"; + $git->check_async($k, sub { + my ($info) = @_; + ++$n; + ++$m if $info->[1] eq 'missing' && $info->[0] eq $k; + }); + if ($git->{async_c}->{wr}->{write_buf_size}) { + diag("async_check capped at $i"); + $max = $i; + last; + } + } + is($m, $n, 'everything expected missing is missing'); + $git->check_async($f, sub { $git = undef }); + Danga::Socket->EventLoop; + + $git = PublicInbox::Git->new($dir); + my $info; + my $str = ''; + my @missing; + $git->cat_async('HEAD:miss', sub { + my ($miss) = @_; + push @missing, $miss; + }); + $git->cat_async($f, sub { + my $res = $_[0]; + if (ref($res) eq 'ARRAY') { + is($info, undef, 'info unset, setting..'); + $info = $res; + } elsif (ref($res) eq 'SCALAR') { + $str .= $$res; + if (length($str) >= $info->[2]) { + is($info->[2], length($str), 'length match'); + $git = undef + } + } + }); + Danga::Socket->EventLoop; + is_deeply(\@missing, [['HEAD:miss', 'missing']], 'missing cat OK'); + is($git, undef, 'git undefined'); + $git = PublicInbox::Git->new($dir); + my $sref = $git->cat_file($f); + is($str, $$sref, 'matches synchronous version'); + $git = undef; + Danga::Socket->RunTimers; +} + +{ + my $git = PublicInbox::Git->new($dir); + foreach my $s (qw(check_async_compat cat_async_compat)) { + my @missing; + $git->check_async_compat('HED:miss1ng', sub { + my ($miss) = @_; + push @missing, $miss; + }); + is_deeply(\@missing, [['HED:miss1ng', 'missing']], + "missing $s OK"); + } + my @info; + my $str = ''; + $git->cat_async_compat('HEAD:foo.txt', sub { + my $ref = $_[0]; + my $t = ref $ref; + if ($t eq 'ARRAY') { + push @info, $ref; + } elsif ($t eq 'SCALAR') { + $str .= $$ref; + } else { + fail "fail type: $t"; + } + }); + is_deeply(\@info, [ [ 'bf4f17855632367a160bef055fc8ba4675d10e6b', + 'blob', 18 ]], 'info matches compat'); + is($str, "-----\nhello\nworld\n", 'data matches compat'); +} + +done_testing(); + +1; |