#!perl -w
# Copyright (C) all contributors
# License: AGPL-3.0+
use v5.12;
use PublicInbox::TestCommon;
require_mods(qw(DBD::SQLite Xapian +SCM_RIGHTS)); # TODO: FIFO support?
use PublicInbox::Spawn qw(spawn);
use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM);
require PublicInbox::AutoReap;
use PublicInbox::IPC;
require PublicInbox::XapClient;
use PublicInbox::DS qw(now);
use autodie;
my ($tmp, $for_destroy) = tmpdir();
my $fi_data = './t/git.fast-import-data';
open my $fi_fh, '<', $fi_data;
open my $dh, '<', '.';
my $crepo = create_coderepo 'for-cindex', sub {
my ($d) = @_;
xsys_e([qw(git init -q --bare)]);
xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
chdir($dh);
run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), '-g', $d])
or xbail '-cindex internal';
run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
"$d/cidx-ext", '-g', $d]) or xbail '-cindex "external"';
};
$dh = $fi_fh = undef;
my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
tmpdir => "$tmp/v2", sub {
my ($im) = @_;
for my $f (qw(t/data/0001.patch t/data/binary.patch
t/data/message_embed.eml
t/solve/0001-simple-mod.patch
t/solve/0002-rename-with-modifications.patch
t/solve/bare.patch)) {
$im->add(eml_load($f)) or BAIL_OUT;
}
};
my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
my $doreq = sub {
my ($s, @arg) = @_;
my $err = ref($arg[-1]) ? pop(@arg) : \*STDERR;
pipe(my $x, my $y);
my $buf = join("\0", @arg, '');
my @fds = (fileno($y), fileno($err));
my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
xbail "send: $!";
my $exp = length($buf);
$exp == $n or xbail "req @arg sent short ($n != $exp)";
$x;
};
local $SIG{PIPE} = 'IGNORE';
my $env = { PERL5LIB => join(':', @INC) };
my $test = sub {
my (@cmd) = @_;
socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
my $pid = spawn(\@cmd, $env, { 0 => $y });
my $ar = PublicInbox::AutoReap->new($pid);
diag "$cmd[-1] running pid=$pid";
close $y;
my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
is($info{has_threadid}, '1', 'has_threadid true for inbox');
like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
$r = $doreq->($s, qw(test_inspect -d), $int[0]);
my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 rt:0..));
$r = $doreq->($s, @dump);
my @res;
while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
pipe(my $err_rd, my $err_wr);
$r = $doreq->($s, @dump, $err_wr);
close $err_wr;
my $res = do { local $/; <$r> };
is(join('', @res), $res, 'got identical response w/ error pipe');
my $stats = do { local $/; <$err_rd> };
is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
diag "res=$res";
return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
# test worker management:
kill('TERM', $cinfo{pid});
my $tries = 0;
do {
$r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
%info = map { split(/=/, $_, 2) }
split(/ /, do { local $/; <$r> });
} while ($info{pid} == $cinfo{pid} && ++$tries < 10);
isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
my %pids;
$tries = 0;
my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]);
kill('TTIN', $pid);
until (scalar(keys %pids) >= 2 || ++$tries > 100) {
tick;
my @r = map { $doreq->(@ins) } (0..100);
for my $fh (@r) {
my $buf = do { local $/; <$fh> } // die "read: $!";
$buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
}
}
is(scalar keys %pids, 2, 'have two pids') or
diag 'pids='.explain(\%pids);
kill('TTOU', $pid);
%pids = ();
my $delay = $tries * 0.11 * ($ENV{VALGRIND} ? 10 : 1);
$tries = 0;
diag 'waiting '.$delay.'s for SIGTTOU';
tick($delay);
until (scalar(keys %pids) == 1 || ++$tries > 100) {
%pids = ();
my @r = map { $doreq->(@ins) } (0..100);
for my $fh (@r) {
my $buf = do { local $/; <$fh> } // die "read: $!";
$buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
}
}
is(scalar keys %pids, 1, 'have one pid') or diag explain(\%pids);
is($info{pid}, (keys %pids)[0], 'kept oldest PID after TTOU');
wantarray ? ($ar, $s) : $ar;
};
my @NO_CXX = (1);
unless ($ENV{TEST_XH_CXX_ONLY}) {
my $ar = $test->($^X, qw[-w -MPublicInbox::XapHelper -e
PublicInbox::XapHelper::start('-j0')]);
($ar, my $s) = $test->($^X, qw[-w -MPublicInbox::XapHelper -e
PublicInbox::XapHelper::start('-j1')]);
no_pollerfd($ar->{pid});
}
SKIP: {
my $cmd = eval {
require PublicInbox::XapHelperCxx;
PublicInbox::XapHelperCxx::cmd();
};
skip "XapHelperCxx build: $@", 1 if $@;
@NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
my $ar = $test->(@$cmd, '-j0');
$ar = $test->(@$cmd, '-j1');
};
require PublicInbox::CodeSearch;
my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
my $root2id_file = "$tmp/root2id";
my @id2root;
{
open my $fh, '>', $root2id_file;
my $i = -1;
for ($cs_int->all_terms('G')) {
print $fh $_, "\0", ++$i, "\0";
$id2root[$i] = $_;
}
close $fh;
}
my $ar;
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
my $xhc = PublicInbox::XapClient::start_helper('-j0');
pipe(my $err_r, my $err_w);
# git patch-id --stable mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
(map { ('-d', $_) } @ibx_idx),
9, "mid:$mid");
close $err_w;
my $res = do { local $/; <$r> };
is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
my $err = do { local $/; <$err_r> };
is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
pipe($err_r, $err_w);
$r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
(map { ('-d', $_) } @int),
$root2id_file, 'dt:19700101'.'000000..');
close $err_w;
my @res = <$r>;
is(scalar(@res), 5, 'got expected rows');
is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
'entries match format');
$err = do { local $/; <$err_r> };
is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
$r = $xhc->mkreq([], qw(mset), @ibx_shard_args,
'dfn:lib/PublicInbox/Search.pm');
chomp((my $hdr, @res) = readline($r));
like $hdr, qr/\bmset\.size=1\b/,
"got expected header via mset ($xhc->{impl}";
is scalar(@res), 1, 'got one result';
@res = split /\0/, $res[0];
{
my $doc = $v2->search->xdb->get_document($res[0]);
ok $doc, 'valid document retrieved';
my @q = PublicInbox::Search::xap_terms('Q', $doc);
is_deeply \@q, [ $mid ], 'docid usable';
}
ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
is scalar(@res), 3, 'only 3 columns in result';
$r = $xhc->mkreq([], qw(mset), @ibx_shard_args,
'dt:19700101'.'000000..');
chomp(($hdr, @res) = readline($r));
like $hdr, qr/\bmset\.size=6\b/,
"got expected header via multi-result mset ($xhc->{impl}";
is(scalar(@res), 6, 'got 6 rows');
for my $r (@res) {
my ($docid, $pct, $rank, @rest) = split /\0/, $r;
my $doc = $v2->search->xdb->get_document($docid);
ok $pct > 0 && $pct <= 100,
"pct > 0 && <= 100 #$docid ($xhc->{impl})";
like $rank, qr/\A\d+\z/, 'rank is a digit';
is scalar(@rest), 0, 'no extra rows returned';
}
my $nr;
for my $i (7, 8, 39, 40) {
pipe($err_r, $err_w);
$r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A),
"XDFPOST$i", (map { ('-d', $_) } @int),
$root2id_file, 'dt:19700101'.'000000..');
close $err_w;
@res = <$r>;
my @err = <$err_r>;
if (defined $nr) {
is scalar(@res), $nr,
"got expected results ($xhc->{impl})";
} else {
$nr //= scalar @res;
ok $nr, "got initial results ($xhc->{impl})";
}
my @oids = (join('', @res) =~ /^([a-f0-9]+) /gms);
is_deeply [grep { length == $i } @oids], \@oids,
"all OIDs match expected length ($xhc->{impl})";
my ($nr_out) = ("@err" =~ /nr_out=(\d+)/);
is $nr_out, scalar(@oids), "output count matches $xhc->{impl}"
or diag explain(\@res, \@err);
}
pipe($err_r, $err_w);
$r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFPOST7),
@ibx_shard_args, qw(13 rt:0..));
close $err_w;
@res = <$r>;
my @err = <$err_r>;
my ($nr_out) = ("@err" =~ /nr_out=(\d+)/);
my @oids = (join('', @res) =~ /^([a-f0-9]{7}) /gms);
is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or
diag explain(\@res, \@err);
if ($ENV{TEST_XH_TIMEOUT}) {
diag 'testing timeouts...';
for my $j (qw(0 1)) {
my $t0 = now;
$r = $xhc->mkreq(undef, qw(test_sleep -K 1 -d),
$ibx_idx[0]);
is readline($r), undef, 'got EOF';
my $diff = now - $t0;
ok $diff < 3, "timeout didn't take too long -j$j";
ok $diff >= 0.9, "timeout didn't fire prematurely -j$j";
$xhc = PublicInbox::XapClient::start_helper('-j1');
}
}
}
done_testing;