# Copyright (C) all contributors
# License: AGPL-3.0+
# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable.
package PublicInbox::XapHelper;
use v5.12;
use Getopt::Long (); # good API even if we only use short options
our $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
use PublicInbox::Search qw(xap_terms);
use PublicInbox::CodeSearch;
use PublicInbox::IPC;
use PublicInbox::IO qw(read_all);
use Socket qw(SOL_SOCKET SO_TYPE SOCK_SEQPACKET AF_UNIX);
use PublicInbox::DS qw(awaitpid);
use autodie qw(open getsockopt);
use POSIX qw(:signal_h);
use Fcntl qw(LOCK_UN LOCK_EX);
use Carp qw(croak);
my $X = \%PublicInbox::Search::X;
our (%SRCH, %WORKERS, $nworker, $workerset, $in);
our $stderr = \*STDERR;
sub cmd_test_inspect {
my ($req) = @_;
print { $req->{0} } "pid=$$ has_threadid=",
($req->{srch}->has_threadid ? 1 : 0)
}
sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 }
sub iter_retry_check ($) {
if (ref($@) =~ /\bDatabaseModifiedError\b/) {
$_[0]->{srch}->reopen;
undef; # retries
} elsif (ref($@) =~ /\bDocNotFoundError\b/) {
warn "doc not found: $@";
0; # continue to next doc
} else {
die;
}
}
sub term_length_extract ($) {
my ($req) = @_;
@{$req->{A_len}} = map {
my $len = s/([0-9]+)\z// ? ($1 + 0) : undef;
[ $_, $len ];
} @{$req->{A}};
}
sub dump_ibx_iter ($$$) {
my ($req, $ibx_id, $it) = @_;
my $out = $req->{0};
eval {
my $doc = $it->get_document;
for my $pair (@{$req->{A_len}}) {
my ($pfx, $len) = @$pair;
my @t = xap_terms($pfx, $doc);
@t = grep { length == $len } @t if defined($len);
for (@t) {
print $out "$_ $ibx_id\n" or die "print: $!";
++$req->{nr_out};
}
}
};
$@ ? iter_retry_check($req) : 0;
}
sub emit_mset_stats ($$) {
my ($req, $mset) = @_;
my $err = $req->{1} or croak "BUG: caller only passed 1 FD";
say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
}
sub cmd_dump_ibx {
my ($req, $ibx_id, $qry_str) = @_;
$qry_str // die 'usage: dump_ibx [OPTIONS] IBX_ID QRY_STR';
$req->{A} or die 'dump_ibx requires -A PREFIX';
term_length_extract $req;
my $max = $req->{'m'} // $req->{srch}->{xdb}->get_doccount;
my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
$opt->{eidx_key} = $req->{O} if defined $req->{O};
my $mset = $req->{srch}->mset($qry_str, $opt);
$req->{0}->autoflush(1);
for my $it ($mset->items) {
for (my $t = 10; $t > 0; --$t) {
$t = dump_ibx_iter($req, $ibx_id, $it) // $t;
}
}
emit_mset_stats($req, $mset);
}
sub dump_roots_iter ($$$) {
my ($req, $root2off, $it) = @_;
eval {
my $doc = $it->get_document;
my $G = join(' ', map { $root2off->{$_} } xap_terms('G', $doc));
for my $pair (@{$req->{A_len}}) {
my ($pfx, $len) = @$pair;
my @t = xap_terms($pfx, $doc);
@t = grep { length == $len } @t if defined($len);
for (@t) {
$req->{wbuf} .= "$_ $G\n";
++$req->{nr_out};
}
}
};
$@ ? iter_retry_check($req) : 0;
}
sub dump_roots_flush ($$) {
my ($req, $fh) = @_;
if ($req->{wbuf} ne '') {
until (flock($fh, LOCK_EX)) { die "LOCK_EX: $!" if !$!{EINTR} }
print { $req->{0} } $req->{wbuf} or die "print: $!";
until (flock($fh, LOCK_UN)) { die "LOCK_UN: $!" if !$!{EINTR} }
$req->{wbuf} = '';
}
}
sub cmd_dump_roots {
my ($req, $root2off_file, $qry_str) = @_;
$qry_str // die 'usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR';
$req->{A} or die 'dump_roots requires -A PREFIX';
term_length_extract $req;
open my $fh, '<', $root2off_file;
my $root2off; # record format: $OIDHEX "\0" uint32_t
my @x = split(/\0/, read_all $fh);
while (defined(my $oidhex = shift @x)) {
$root2off->{$oidhex} = shift @x;
}
my $opt = { relevance => -1, limit => $req->{'m'},
offset => $req->{o} // 0 };
my $mset = $req->{srch}->mset($qry_str, $opt);
$req->{0}->autoflush(1);
$req->{wbuf} = '';
for my $it ($mset->items) {
for (my $t = 10; $t > 0; --$t) {
$t = dump_roots_iter($req, $root2off, $it) // $t;
}
if (!($req->{nr_out} & 0x3fff)) {
dump_roots_flush($req, $fh);
}
}
dump_roots_flush($req, $fh);
emit_mset_stats($req, $mset);
}
sub mset_iter ($$) {
my ($req, $it) = @_;
say { $req->{0} } $it->get_docid, "\0",
$it->get_percent, "\0", $it->get_rank;
}
sub cmd_mset { # to be used by WWW + IMAP
my ($req, $qry_str) = @_;
$qry_str // die 'usage: mset [OPTIONS] QRY_STR';
my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 };
$opt->{relevance} = 1 if $req->{r};
$opt->{threads} = 1 if defined $req->{t};
$opt->{git_dir} = $req->{g} if defined $req->{g};
$opt->{eidx_key} = $req->{O} if defined $req->{O};
$opt->{threadid} = $req->{T} if defined $req->{T};
my $mset = $req->{srch}->mset($qry_str, $opt);
say { $req->{0} } 'mset.size=', $mset->size,
' .get_matches_estimated=', $mset->get_matches_estimated;
for my $it ($mset->items) {
for (my $t = 10; $t > 0; --$t) {
$t = mset_iter($req, $it) // $t;
}
}
}
sub srch_init_extra ($) {
my ($req) = @_;
my $qp = $req->{srch}->{qp};
for (@{$req->{Q}}) {
my ($upfx, $m, $xpfx) = split /([:=])/;
$xpfx // die "E: bad -Q $_";
$m = $m eq '=' ? 'add_boolean_prefix' : 'add_prefix';
$qp->$m($upfx, $xpfx);
}
$req->{srch}->{qp_extra_done} = 1;
}
sub dispatch {
my ($req, $cmd, @argv) = @_;
my $fn = $req->can("cmd_$cmd") or return;
$GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC)
or return;
my $dirs = delete $req->{d} or die 'no -d args';
my $key = join("\0", @$dirs);
my $new;
$req->{srch} = $SRCH{$key} //= do {
$new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
my $first = shift @$dirs;
my $slow_phrase = -f "$first/iamchert";
$new->{xdb} = $X->{Database}->new($first);
for (@$dirs) {
$slow_phrase ||= -f "$_/iamchert";
$new->{xdb}->add_database($X->{Database}->new($_));
}
$slow_phrase or
$new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
'PublicInbox::Search';
$new->{qp} = $new->qparse_new;
$new;
};
$req->{srch}->{xdb}->reopen unless $new;
$req->{Q} && !$req->{srch}->{qp_extra_done} and
srch_init_extra $req;
my $timeo = $req->{K};
alarm($timeo) if $timeo;
$fn->($req, @argv);
alarm(0) if $timeo;
}
sub recv_loop {
local $SIG{__WARN__} = sub { print $stderr @_ };
my $rbuf;
local $SIG{TERM} = sub { undef $in };
local $SIG{USR1} = \&reopen_logs;
while (defined($in)) {
PublicInbox::DS::sig_setmask($workerset);
my @fds = eval { # we undef $in in SIG{TERM}
$PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33)
};
if ($@) {
exit if !$in; # hit by SIGTERM
die;
}
scalar(@fds) or exit(66); # EX_NOINPUT
die "recvmsg: $!" if !defined($fds[0]);
PublicInbox::DS::block_signals(POSIX::SIGALRM);
my $req = bless {}, __PACKAGE__;
my $i = 0;
open($req->{$i++}, '+<&=', $_) for @fds;
local $stderr = $req->{1} // \*STDERR;
die "not NUL-terminated" if chop($rbuf) ne "\0";
my @argv = split(/\0/, $rbuf);
$req->{nr_out} = 0;
$req->dispatch(@argv) if @argv;
}
}
sub reap_worker { # awaitpid CB
my ($pid, $nr) = @_;
delete $WORKERS{$nr};
if (($? >> 8) == 66) { # EX_NOINPUT
undef $in;
} elsif ($?) {
warn "worker[$nr] died \$?=$?\n";
}
PublicInbox::DS::requeue(\&start_workers) if $in;
}
sub start_worker ($) {
my ($nr) = @_;
my $pid = eval { PublicInbox::DS::fork_persist } // return(warn($@));
if ($pid == 0) {
undef %WORKERS;
$SIG{TTIN} = $SIG{TTOU} = 'IGNORE';
$SIG{CHLD} = 'DEFAULT'; # Xapian may use this
recv_loop();
exit(0);
} else {
$WORKERS{$nr} = $pid;
awaitpid($pid, \&reap_worker, $nr);
}
}
sub start_workers {
for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) {
start_worker($nr) if $in;
}
}
sub do_sigttou {
if ($in && $nworker > 1) {
--$nworker;
my @nr = grep { $_ >= $nworker } keys %WORKERS;
kill('TERM', @WORKERS{@nr});
}
}
sub reopen_logs {
my $p = $ENV{STDOUT_PATH};
defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1);
$p = $ENV{STDERR_PATH};
defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1);
}
sub parent_reopen_logs {
reopen_logs();
kill('USR1', values %WORKERS);
}
sub xh_alive { $in || scalar(keys %WORKERS) }
sub start (@) {
my (@argv) = @_;
my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE);
unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET';
local (%SRCH, %WORKERS);
PublicInbox::Search::load_xapian();
$GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
die 'bad args';
local $workerset = POSIX::SigSet->new;
$workerset->fillset or die "fillset: $!";
for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
$workerset->delset($_) or die "delset($_): $!";
}
local $nworker = $opt->{j};
return recv_loop() if $nworker == 0;
die '-j must be >= 0' if $nworker < 0;
for (POSIX::SIGTERM, POSIX::SIGCHLD) {
$workerset->delset($_) or die "delset($_): $!";
}
my $sig = {
TTIN => sub {
if ($in) {
++$nworker;
PublicInbox::DS::requeue(\&start_workers)
}
},
TTOU => \&do_sigttou,
CHLD => \&PublicInbox::DS::enqueue_reap,
USR1 => \&parent_reopen_logs,
};
PublicInbox::DS::block_signals();
start_workers();
@PublicInbox::DS::post_loop_do = \&xh_alive;
PublicInbox::DS::event_loop($sig);
}
1;