about summary refs log tree commit homepage
path: root/lib/PublicInbox/Git.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Git.pm')
-rw-r--r--lib/PublicInbox/Git.pm193
1 files changed, 170 insertions, 23 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 59c27470..893df71e 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -12,12 +12,44 @@ use warnings;
 use POSIX qw(dup2);
 require IO::Handle;
 use PublicInbox::Spawn qw(spawn popen_rd);
+use Fcntl qw(:seek);
+my $have_async = eval {
+        require PublicInbox::EvCleanup;
+        require PublicInbox::GitAsync;
+};
 
 sub new {
         my ($class, $git_dir) = @_;
         bless { git_dir => $git_dir }, $class
 }
 
+sub err_begin ($) {
+        my $err = $_[0]->{err};
+        unless ($err) {
+                open($err, '+>', undef);
+                $_[0]->{err} = $err;
+        }
+        sysseek($err, 0, SEEK_SET) or die "sysseek failed: $!";
+        truncate($err, 0) or die "truncate failed: $!";
+        my $ret = fileno($err);
+        defined $ret or die "fileno failed: $!";
+        $ret;
+}
+
+sub err ($) {
+        my $err = $_[0]->{err} or return '';
+        sysseek($err, 0, SEEK_SET) or die "sysseek failed: $!";
+        defined(sysread($err, my $buf, -s $err)) or die "sysread failed: $!";
+        sysseek($err, 0, SEEK_SET) or die "sysseek failed: $!";
+        truncate($err, 0) or die "truncate failed: $!";
+        $buf;
+}
+
+sub cmd {
+        my $self = shift;
+        [ 'git', "--git-dir=$self->{git_dir}", @_ ];
+}
+
 sub _bidi_pipe {
         my ($self, $batch, $in, $out, $pid) = @_;
         return if $self->{$pid};
@@ -26,9 +58,8 @@ sub _bidi_pipe {
         pipe($in_r, $in_w) or fail($self, "pipe failed: $!");
         pipe($out_r, $out_w) or fail($self, "pipe failed: $!");
 
-        my @cmd = ('git', "--git-dir=$self->{git_dir}", qw(cat-file), $batch);
         my $redir = { 0 => fileno($out_r), 1 => fileno($in_w) };
-        my $p = spawn(\@cmd, undef, $redir);
+        my $p = spawn(cmd($self, qw(cat-file), $batch), undef, $redir);
         defined $p or fail($self, "spawn failed: $!");
         $self->{$pid} = $p;
         $out_w->autoflush(1);
@@ -36,20 +67,46 @@ sub _bidi_pipe {
         $self->{$in} = $in_r;
 }
 
-sub cat_file {
-        my ($self, $obj, $ref) = @_;
-
-        batch_prepare($self);
+# legacy synchronous API
+sub cat_file_begin {
+        my ($self, $obj) = @_;
+        $self->_bidi_pipe(qw(--batch in out pid));
         $self->{out}->print($obj, "\n") or fail($self, "write error: $!");
 
         my $in = $self->{in};
         local $/ = "\n";
         my $head = $in->getline;
         $head =~ / missing$/ and return undef;
-        $head =~ /^[0-9a-f]{40} \S+ (\d+)$/ or
+        $head =~ /^([0-9a-f]{40}) (\S+) (\d+)$/ or
                 fail($self, "Unexpected result from git cat-file: $head");
 
-        my $size = $1;
+        ($in, $1, $2, $3);
+}
+
+# legacy synchronous API
+sub cat_file_finish {
+        my ($self, $left) = @_;
+        my $max = 8192;
+        my $in = $self->{in};
+        my $buf;
+        while ($left > 0) {
+                my $r = read($in, $buf, $left > $max ? $max : $left);
+                defined($r) or fail($self, "read failed: $!");
+                $r == 0 and fail($self, 'exited unexpectedly');
+                $left -= $r;
+        }
+
+        my $r = read($in, $buf, 1);
+        defined($r) or fail($self, "read failed: $!");
+        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+}
+
+# legacy synchronous API
+sub cat_file {
+        my ($self, $obj, $ref) = @_;
+
+        my ($in, $hex, $type, $size) = $self->cat_file_begin($obj);
+        return unless $in;
         my $ref_type = $ref ? ref($ref) : '';
 
         my $rv;
@@ -58,16 +115,8 @@ sub cat_file {
         my $cb_err;
 
         if ($ref_type eq 'CODE') {
-                $rv = eval { $ref->($in, \$left) };
+                $rv = eval { $ref->($in, \$left, $type, $hex) };
                 $cb_err = $@;
-                # drain the rest
-                my $max = 8192;
-                while ($left > 0) {
-                        my $r = read($in, my $x, $left > $max ? $max : $left);
-                        defined($r) or fail($self, "read failed: $!");
-                        $r == 0 and fail($self, 'exited unexpectedly');
-                        $left -= $r;
-                }
         } else {
                 my $offset = 0;
                 my $buf = '';
@@ -80,10 +129,7 @@ sub cat_file {
                 }
                 $rv = \$buf;
         }
-
-        my $r = read($in, my $buf, 1);
-        defined($r) or fail($self, "read failed: $!");
-        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+        $self->cat_file_finish($left);
         die $cb_err if $cb_err;
 
         $rv;
@@ -91,6 +137,7 @@ sub cat_file {
 
 sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) }
 
+# legacy synchronous API
 sub check {
         my ($self, $obj) = @_;
         $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
@@ -119,8 +166,16 @@ sub fail {
 
 sub popen {
         my ($self, @cmd) = @_;
-        @cmd = ('git', "--git-dir=$self->{git_dir}", @cmd);
-        popen_rd(\@cmd);
+        my $cmd = cmd($self);
+        my ($env, $opt);
+        if (ref $cmd[0]) {
+                push @$cmd, @{$cmd[0]};
+                $env = $cmd[1];
+                $opt = $cmd[2];
+        } else {
+                push @$cmd, @cmd;
+        }
+        popen_rd($cmd, $env, $opt);
 }
 
 sub qx {
@@ -137,10 +192,102 @@ sub cleanup {
         my ($self) = @_;
         _destroy($self, qw(in out pid));
         _destroy($self, qw(in_c out_c pid_c));
+
+        if ($have_async) {
+                my %h = %$self; # yup, copy ourselves
+                %$self = ();
+                my $ds_closed;
+
+                # schedule closing with Danga::Socket::close:
+                foreach (qw(async async_c)) {
+                        my $ds = delete $h{$_} or next;
+                        $ds->close;
+                        $ds_closed = 1;
+                }
+
+                # can't do waitpid in _destroy() until next tick,
+                # since D::S defers closing until end of current event loop
+                $ds_closed and PublicInbox::EvCleanup::next_tick(sub {
+                        _destroy(\%h, qw(in_a out_a pid_a));
+                        _destroy(\%h, qw(in_ac out_ac pid_ac));
+                });
+        }
 }
 
 sub DESTROY { cleanup(@_) }
 
+# modern async API
+sub check_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async_c} ||= do {
+                _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac));
+                PublicInbox::GitAsync->new($self->{in_ac}, $self->{out_ac}, 1);
+        })->cat_file_async($obj, $cb);
+}
+
+sub cat_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async} ||= do {
+                _bidi_pipe($self, qw(--batch in_a out_a pid_a));
+                PublicInbox::GitAsync->new($self->{in_a}, $self->{out_a});
+        })->cat_file_async($obj, $cb);
+}
+
+sub async_info_compat ($) {
+        local $/ = "\n";
+        chomp(my $line = $_[0]->getline);
+        [ split(/ /, $line) ];
+}
+
+sub check_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
+        $self->{out_c}->print($obj."\n") or fail($self, "write error: $!");
+        my $info = async_info_compat($self->{in_c});
+        $cb->($info);
+}
+
+sub cat_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch in out pid));
+        $self->{out}->print($obj."\n") or fail($self, "write error: $!");
+        my $in = $self->{in};
+        my $info = async_info_compat($in);
+        my (undef, $type, $left) = @$info;
+        $cb->($info);
+        return if $info->[1] eq 'missing';
+        my $max = 8192;
+        my ($buf, $r);
+        while ($left > 0) {
+                $r = read($in, $buf, $left > $max ? $max : $left);
+                return $cb->($r) unless $r; # undef or 0
+                $left -= $r;
+                $cb->(\$buf);
+        }
+        $r = read($in, $buf, 1);
+        defined($r) or fail($self, "read failed: $!");
+        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+        $cb->(0);
+}
+
+sub check_async {
+        my ($self, $env, $obj, $cb) = @_;
+        if ($env->{'pi-httpd.async'}) {
+                check_async_ds($self, $obj, $cb);
+        } else {
+                check_async_compat($self, $obj, $cb);
+        }
+}
+
+sub cat_async {
+        my ($self, $env, $obj, $cb) = @_;
+        if ($env->{'pi-httpd.async'}) {
+                cat_async_ds($self, $obj, $cb);
+        } else {
+                cat_async_compat($self, $obj, $cb);
+        }
+}
+
 1;
 __END__
 =pod