about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-12-31 11:16:47 +0000
committerEric Wong <e@80x24.org>2017-01-07 23:38:29 +0000
commit68f83ca5236078128a0ccf47a1e54bd955777120 (patch)
treef60805615cb69e97c7e79182d2b991f67f900210
parent0753326bc4f148436b4c34dfb8133b5579de0198 (diff)
downloadpublic-inbox-68f83ca5236078128a0ccf47a1e54bd955777120.tar.gz
This will allow us to handle network operations while waiting
on "git cat-file" to seek and unpack things.
-rw-r--r--MANIFEST3
-rw-r--r--lib/PublicInbox/Git.pm89
-rw-r--r--lib/PublicInbox/GitAsyncRd.pm133
-rw-r--r--lib/PublicInbox/GitAsyncWr.pm23
-rw-r--r--t/git_async.t138
5 files changed, 386 insertions, 0 deletions
diff --git a/MANIFEST b/MANIFEST
index 59d44bcf..cc4882a1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -55,6 +55,8 @@ lib/PublicInbox/Filter/Mirror.pm
 lib/PublicInbox/Filter/Vger.pm
 lib/PublicInbox/GetlineBody.pm
 lib/PublicInbox/Git.pm
+lib/PublicInbox/GitAsyncRd.pm
+lib/PublicInbox/GitAsyncWr.pm
 lib/PublicInbox/GitHTTPBackend.pm
 lib/PublicInbox/HTTP.pm
 lib/PublicInbox/HTTPD.pm
@@ -151,6 +153,7 @@ t/git-http-backend.psgi
 t/git-http-backend.t
 t/git.fast-import-data
 t/git.t
+t/git_async.t
 t/html_index.t
 t/httpd-corner.psgi
 t/httpd-corner.t
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 4dfc4099..0b482c7e 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -13,6 +13,10 @@ use POSIX qw(dup2);
 require IO::Handle;
 use PublicInbox::Spawn qw(spawn popen_rd);
 use Fcntl qw(:seek);
+my $have_async = eval {
+        require PublicInbox::EvCleanup;
+        require PublicInbox::GitAsyncRd;
+};
 
 # Documentation/SubmittingPatches recommends 12 (Linux v4.4)
 my $abbrev = `git config core.abbrev` || 12;
@@ -64,6 +68,7 @@ sub _bidi_pipe {
         $self->{$in} = $in_r;
 }
 
+# legacy synchronous API
 sub cat_file_begin {
         my ($self, $obj) = @_;
         $self->_bidi_pipe(qw(--batch in out pid));
@@ -79,6 +84,7 @@ sub cat_file_begin {
         ($in, $1, $2, $3);
 }
 
+# legacy synchronous API
 sub cat_file_finish {
         my ($self, $left) = @_;
         my $max = 8192;
@@ -96,6 +102,7 @@ sub cat_file_finish {
         fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
 }
 
+# legacy synchronous API
 sub cat_file {
         my ($self, $obj, $ref) = @_;
 
@@ -131,6 +138,7 @@ sub cat_file {
 
 sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) }
 
+# legacy synchronous API
 sub check {
         my ($self, $obj) = @_;
         $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
@@ -185,10 +193,91 @@ sub cleanup {
         my ($self) = @_;
         _destroy($self, qw(in out pid));
         _destroy($self, qw(in_c out_c pid_c));
+
+        if ($have_async) {
+                my %h = %$self; # yup, copy ourselves
+                %$self = ();
+                my $ds_closed;
+
+                # schedule closing with Danga::Socket::close:
+                foreach (qw(async async_c)) {
+                        my $ds = delete $h{$_} or next;
+                        $ds->close;
+                        $ds_closed = 1;
+                }
+
+                # can't do waitpid in _destroy() until next tick,
+                # since D::S defers closing until end of current event loop
+                $ds_closed and PublicInbox::EvCleanup::next_tick(sub {
+                        _destroy(\%h, qw(in_a out_a pid_a));
+                        _destroy(\%h, qw(in_ac out_ac pid_ac));
+                });
+        }
 }
 
 sub DESTROY { cleanup(@_) }
 
+# modern async API
+sub check_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async_c} ||= do {
+                _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac));
+                PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1)
+        })->cat_file_async($obj, $cb);
+}
+
+sub cat_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async} ||= do {
+                _bidi_pipe($self, qw(--batch in_a out_a pid_a));
+                PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a});
+        })->cat_file_async($obj, $cb);
+}
+
+sub async_info_compat ($) {
+        local $/ = "\n";
+        chomp(my $line = $_[0]->getline);
+        [ split(/ /, $line) ];
+}
+
+sub check_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
+        $self->{out_c}->print($obj."\n") or fail($self, "write error: $!");
+        my $info = async_info_compat($self->{in_c});
+        $cb->($info);
+}
+
+sub cat_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch in out pid));
+        $self->{out}->print($obj."\n") or fail($self, "write error: $!");
+        my $in = $self->{in};
+        my $info = async_info_compat($in);
+        $cb->($info);
+        return if scalar(@$info) != 3; # missing
+        my $max = 8192;
+        my $left = $info->[2];
+        my ($buf, $r);
+        while ($left > 0) {
+                $r = read($in, $buf, $left > $max ? $max : $left);
+                return $cb->($r) unless $r; # undef or 0
+                $left -= $r;
+                $cb->(\$buf);
+        }
+        $r = read($in, $buf, 1);
+        defined($r) or fail($self, "read failed: $!");
+        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+}
+
+if ($have_async) {
+        *check_async = *check_async_ds;
+        *cat_async = *cat_async_ds;
+} else {
+        *check_async = *check_async_compat;
+        *cat_async = *cat_async_compat;
+}
+
 1;
 __END__
 =pod
diff --git a/lib/PublicInbox/GitAsyncRd.pm b/lib/PublicInbox/GitAsyncRd.pm
new file mode 100644
index 00000000..a56dc392
--- /dev/null
+++ b/lib/PublicInbox/GitAsyncRd.pm
@@ -0,0 +1,133 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This parses the output pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncRd;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+use fields qw(jobq rbuf wr check);
+use PublicInbox::GitAsyncWr;
+our $MAX = 65536; # Import may bump this in the future
+
+sub new {
+        my ($class, $rd, $wr, $check) = @_;
+        my $self = fields::new($class);
+        IO::Handle::blocking($rd, 0);
+        $self->SUPER::new($rd);
+        $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ]
+        my $buf = '';
+        $self->{rbuf} = \$buf;
+        $self->{wr} = PublicInbox::GitAsyncWr->new($wr);
+        $self->{check} = $check;
+        $self->watch_read(1);
+        $self;
+}
+
+sub cat_file_async {
+        my ($self, $obj, $cb) = @_;
+        # order matters
+        push @{$self->{jobq}}, [ $obj, $cb ];
+        $self->{wr}->write($obj."\n");
+}
+
+# Returns: an array ref of the info line for --batch-check and --batch,
+# which may be: [ $obj, 'missing']
+# Returns undef on error
+sub read_info ($) {
+        my ($self) = @_;
+        my $rbuf = $self->{rbuf};
+        my $rd = $self->{sock};
+
+        while (1) {
+                $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ];
+
+                my $r = sysread($rd, $$rbuf, 110, length($$rbuf));
+                next if $r;
+                return $r;
+        }
+}
+
+sub event_read {
+        my ($self) = @_;
+        my $jobq = $self->{jobq};
+        my ($cur, $obj, $cb, $info, $left);
+        my $check = $self->{check};
+        my ($rbuf, $rlen, $need, $buf);
+take_job:
+        $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__;
+        ($obj, $cb, $info, $left) = @$cur;
+        if (!$info) {
+                $info = read_info($self);
+                if (!defined $info && ($!{EAGAIN} || $!{EINTR})) {
+                        return unshift(@$jobq, $cur)
+                }
+                $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!)
+                return $self->close unless $info;
+                if ($check || (scalar(@$info) != 3)) {
+                        # do not monopolize the event loop if we're drained:
+                        return if ${$self->{rbuf}} eq '';
+                        goto take_job;
+                }
+                $cur->[2] = $info;
+                my $len = $info->[2];
+                $left = \$len;
+                $cur->[3] = $left; # onto reading body...
+        }
+        ref($left) or die 'BUG: $left not ref in '.__PACKAGE__;
+
+        $rbuf = $self->{rbuf};
+        $rlen = length($$rbuf);
+        $need = $$left + 1; # +1 for trailing LF
+        $buf = '';
+
+        if ($rlen == $need) {
+final_hunk:
+                $self->{rbuf} = \$buf;
+                $$left = undef;
+                my $lf = chop $$rbuf;
+                $lf eq "\n" or die "BUG: missing LF (got $lf)";
+                $cb->($rbuf);
+
+                return if $buf eq '';
+                goto take_job;
+        } elsif ($rlen < $need) {
+                my $all = $need - $rlen;
+                my $n = $all > $MAX ? $MAX : $all;
+                my $r = sysread($self->{sock}, $$rbuf, $n, $rlen);
+                if ($r) {
+                        goto final_hunk if $r == $all;
+
+                        # more to read later...
+                        $$left -= $r;
+                        $self->{rbuf} = \$buf;
+                        $cb->($rbuf);
+
+                        # don't monopolize the event loop
+                        return unshift(@$jobq, $cur);
+                } elsif (!defined $r) {
+                        return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR};
+                }
+                $cb->($r); # $cb should handle 0 and undef (and see $!)
+                $self->close; # FAIL...
+        } else { # too much data in rbuf
+                $buf = substr($$rbuf, $need, $rlen - $need);
+                $$rbuf = substr($$rbuf, 0, $need);
+                goto final_hunk;
+        }
+}
+
+sub close {
+        my $self = shift;
+        my $jobq = $self->{jobq};
+        $self->{jobq} = [];
+        $_->[1]->(0) for @$jobq;
+        $self->{wr}->close;
+        $self->SUPER::close(@_);
+}
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
diff --git a/lib/PublicInbox/GitAsyncWr.pm b/lib/PublicInbox/GitAsyncWr.pm
new file mode 100644
index 00000000..c22f2fcc
--- /dev/null
+++ b/lib/PublicInbox/GitAsyncWr.pm
@@ -0,0 +1,23 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This writes to the input pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncWr;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+
+sub new {
+        my ($class, $io) = @_;
+        my $self = fields::new($class);
+        IO::Handle::blocking($io, 0);
+        $self->SUPER::new($io);
+}
+
+# we only care about write + event_write
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
diff --git a/t/git_async.t b/t/git_async.t
new file mode 100644
index 00000000..c20d48e3
--- /dev/null
+++ b/t/git_async.t
@@ -0,0 +1,138 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+$SIG{PIPE} = 'IGNORE';
+foreach my $mod (qw(Danga::Socket)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for git_async.t" if $@;
+}
+use File::Temp qw/tempdir/;
+use Cwd qw/getcwd/;
+my $tmpdir = tempdir('git_async-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+use_ok 'PublicInbox::Git';
+my $dir = "$tmpdir/git.git";
+{
+        is(system(qw(git init -q --bare), $dir), 0, 'created git directory');
+        my @cmd = ('git', "--git-dir=$dir", 'fast-import', '--quiet');
+        my $fi_data = getcwd().'/t/git.fast-import-data';
+        ok(-r $fi_data, "fast-import data readable (or run test at top level)");
+        my $pid = fork;
+        defined $pid or die "fork failed: $!\n";
+        if ($pid == 0) {
+                open STDIN, '<', $fi_data or die "open $fi_data: $!\n";
+                exec @cmd;
+                die "failed exec: ",join(' ', @cmd),": $!\n";
+        }
+        waitpid $pid, 0;
+        is($?, 0, 'fast-import succeeded');
+}
+
+{
+        my $f = 'HEAD:foo.txt';
+        my @args;
+        my $n = 0;
+        my $git = PublicInbox::Git->new($dir);
+        Danga::Socket->SetPostLoopCallback(sub {
+                my ($fdmap) = @_;
+                foreach (values %$fdmap) {
+                        return 1 if ref($_) =~ /::GitAsync/;
+                }
+                0
+        });
+        $git->check_async($f, sub {
+                $n++;
+                @args = @_;
+                $git = undef;
+        });
+        Danga::Socket->EventLoop;
+        my @exp = PublicInbox::Git->new($dir)->check($f);
+        my $exp = [ \@exp ];
+        is_deeply(\@args, $exp, 'matches regular check');
+        is($n, 1, 'callback only called once');
+        $git = PublicInbox::Git->new($dir);
+        $n = 0;
+        my $max = 100;
+        my $missing = 'm';
+        my $m = 0;
+        for my $i (0..$max) {
+                my $k = "HEAD:m$i";
+                $git->check_async($k, sub {
+                        my ($info) = @_;
+                        ++$n;
+                        ++$m if $info->[1] eq 'missing' && $info->[0] eq $k;
+                });
+                if ($git->{async_c}->{wr}->{write_buf_size}) {
+                        diag("async_check capped at $i");
+                        $max = $i;
+                        last;
+                }
+        }
+        is($m, $n, 'everything expected missing is missing');
+        $git->check_async($f, sub { $git = undef });
+        Danga::Socket->EventLoop;
+
+        $git = PublicInbox::Git->new($dir);
+        my $info;
+        my $str = '';
+        my @missing;
+        $git->cat_async('HEAD:miss', sub {
+                my ($miss) = @_;
+                push @missing, $miss;
+        });
+        $git->cat_async($f, sub {
+                my $res = $_[0];
+                if (ref($res) eq 'ARRAY') {
+                        is($info, undef, 'info unset, setting..');
+                        $info = $res;
+                } elsif (ref($res) eq 'SCALAR') {
+                        $str .= $$res;
+                        if (length($str) >= $info->[2]) {
+                                is($info->[2], length($str), 'length match');
+                                $git = undef
+                        }
+                }
+        });
+        Danga::Socket->EventLoop;
+        is_deeply(\@missing, [['HEAD:miss', 'missing']], 'missing cat OK');
+        is($git, undef, 'git undefined');
+        $git = PublicInbox::Git->new($dir);
+        my $sref = $git->cat_file($f);
+        is($str, $$sref, 'matches synchronous version');
+        $git = undef;
+        Danga::Socket->RunTimers;
+}
+
+{
+        my $git = PublicInbox::Git->new($dir);
+        foreach my $s (qw(check_async_compat cat_async_compat)) {
+                my @missing;
+                $git->check_async_compat('HED:miss1ng', sub {
+                        my ($miss) = @_;
+                        push @missing, $miss;
+                });
+                is_deeply(\@missing, [['HED:miss1ng', 'missing']],
+                        "missing $s OK");
+        }
+        my @info;
+        my $str = '';
+        $git->cat_async_compat('HEAD:foo.txt', sub {
+                my $ref = $_[0];
+                my $t = ref $ref;
+                if ($t eq 'ARRAY') {
+                        push @info, $ref;
+                } elsif ($t eq 'SCALAR') {
+                        $str .= $$ref;
+                } else {
+                        fail "fail type: $t";
+                }
+        });
+        is_deeply(\@info, [ [ 'bf4f17855632367a160bef055fc8ba4675d10e6b',
+                               'blob', 18 ]], 'info matches compat');
+        is($str, "-----\nhello\nworld\n", 'data matches compat');
+}
+
+done_testing();
+
+1;