# Copyright (C) all contributors # License: AGPL-3.0+ # Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable. package PublicInbox::XapHelper; use v5.12; use Getopt::Long (); # good API even if we only use short options our $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev)); use PublicInbox::Search qw(xap_terms); use PublicInbox::CodeSearch; use PublicInbox::IPC; use PublicInbox::IO qw(read_all); use Socket qw(SOL_SOCKET SO_TYPE SOCK_SEQPACKET AF_UNIX); use PublicInbox::DS qw(awaitpid); use autodie qw(open getsockopt); use POSIX qw(:signal_h); use Fcntl qw(LOCK_UN LOCK_EX); use Carp qw(croak); my $X = \%PublicInbox::Search::X; our (%SRCH, %WORKERS, $nworker, $workerset, $in); our $stderr = \*STDERR; sub cmd_test_inspect { my ($req) = @_; print { $req->{0} } "pid=$$ has_threadid=", ($req->{srch}->has_threadid ? 1 : 0) } sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 } sub iter_retry_check ($) { if (ref($@) =~ /\bDatabaseModifiedError\b/) { $_[0]->{srch}->reopen; undef; # retries } elsif (ref($@) =~ /\bDocNotFoundError\b/) { warn "doc not found: $@"; 0; # continue to next doc } else { die; } } sub term_length_extract ($) { my ($req) = @_; @{$req->{A_len}} = map { my $len = s/([0-9]+)\z// ? ($1 + 0) : undef; [ $_, $len ]; } @{$req->{A}}; } sub dump_ibx_iter ($$$) { my ($req, $ibx_id, $it) = @_; my $out = $req->{0}; eval { my $doc = $it->get_document; for my $pair (@{$req->{A_len}}) { my ($pfx, $len) = @$pair; my @t = xap_terms($pfx, $doc); @t = grep { length == $len } @t if defined($len); for (@t) { print $out "$_ $ibx_id\n" or die "print: $!"; ++$req->{nr_out}; } } }; $@ ? iter_retry_check($req) : 0; } sub emit_mset_stats ($$) { my ($req, $mset) = @_; my $err = $req->{1} or croak "BUG: caller only passed 1 FD"; say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out} } sub cmd_dump_ibx { my ($req, $ibx_id, $qry_str) = @_; $qry_str // die 'usage: dump_ibx [OPTIONS] IBX_ID QRY_STR'; $req->{A} or die 'dump_ibx requires -A PREFIX'; term_length_extract $req; my $max = $req->{'m'} // $req->{srch}->{xdb}->get_doccount; my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 }; $opt->{eidx_key} = $req->{O} if defined $req->{O}; my $mset = $req->{srch}->mset($qry_str, $opt); $req->{0}->autoflush(1); for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { $t = dump_ibx_iter($req, $ibx_id, $it) // $t; } } emit_mset_stats($req, $mset); } sub dump_roots_iter ($$$) { my ($req, $root2off, $it) = @_; eval { my $doc = $it->get_document; my $G = join(' ', map { $root2off->{$_} } xap_terms('G', $doc)); for my $pair (@{$req->{A_len}}) { my ($pfx, $len) = @$pair; my @t = xap_terms($pfx, $doc); @t = grep { length == $len } @t if defined($len); for (@t) { $req->{wbuf} .= "$_ $G\n"; ++$req->{nr_out}; } } }; $@ ? iter_retry_check($req) : 0; } sub dump_roots_flush ($$) { my ($req, $fh) = @_; if ($req->{wbuf} ne '') { until (flock($fh, LOCK_EX)) { die "LOCK_EX: $!" if !$!{EINTR} } print { $req->{0} } $req->{wbuf} or die "print: $!"; until (flock($fh, LOCK_UN)) { die "LOCK_UN: $!" if !$!{EINTR} } $req->{wbuf} = ''; } } sub cmd_dump_roots { my ($req, $root2off_file, $qry_str) = @_; $qry_str // die 'usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR'; $req->{A} or die 'dump_roots requires -A PREFIX'; term_length_extract $req; open my $fh, '<', $root2off_file; my $root2off; # record format: $OIDHEX "\0" uint32_t my @x = split(/\0/, read_all $fh); while (defined(my $oidhex = shift @x)) { $root2off->{$oidhex} = shift @x; } my $opt = { relevance => -1, limit => $req->{'m'}, offset => $req->{o} // 0 }; my $mset = $req->{srch}->mset($qry_str, $opt); $req->{0}->autoflush(1); $req->{wbuf} = ''; for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { $t = dump_roots_iter($req, $root2off, $it) // $t; } if (!($req->{nr_out} & 0x3fff)) { dump_roots_flush($req, $fh); } } dump_roots_flush($req, $fh); emit_mset_stats($req, $mset); } sub mset_iter ($$) { my ($req, $it) = @_; say { $req->{0} } $it->get_docid, "\0", $it->get_percent, "\0", $it->get_rank; } sub cmd_mset { # to be used by WWW + IMAP my ($req, $qry_str) = @_; $qry_str // die 'usage: mset [OPTIONS] QRY_STR'; my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 }; $opt->{relevance} = 1 if $req->{r}; $opt->{threads} = 1 if defined $req->{t}; $opt->{git_dir} = $req->{g} if defined $req->{g}; $opt->{eidx_key} = $req->{O} if defined $req->{O}; $opt->{threadid} = $req->{T} if defined $req->{T}; my $mset = $req->{srch}->mset($qry_str, $opt); say { $req->{0} } 'mset.size=', $mset->size, ' .get_matches_estimated=', $mset->get_matches_estimated; for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { $t = mset_iter($req, $it) // $t; } } } sub srch_init_extra ($) { my ($req) = @_; my $qp = $req->{srch}->{qp}; for (@{$req->{Q}}) { my ($upfx, $m, $xpfx) = split /([:=])/; $xpfx // die "E: bad -Q $_"; $m = $m eq '=' ? 'add_boolean_prefix' : 'add_prefix'; $qp->$m($upfx, $xpfx); } $req->{srch}->{qp_extra_done} = 1; } sub dispatch { my ($req, $cmd, @argv) = @_; my $fn = $req->can("cmd_$cmd") or return; $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC) or return; my $dirs = delete $req->{d} or die 'no -d args'; my $key = join("\0", @$dirs); my $new; $req->{srch} = $SRCH{$key} //= do { $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; my $first = shift @$dirs; my $slow_phrase = -f "$first/iamchert"; $new->{xdb} = $X->{Database}->new($first); for (@$dirs) { $slow_phrase ||= -f "$_/iamchert"; $new->{xdb}->add_database($X->{Database}->new($_)); } $slow_phrase or $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE(); bless $new, $req->{c} ? 'PublicInbox::CodeSearch' : 'PublicInbox::Search'; $new->{qp} = $new->qparse_new; $new; }; $req->{srch}->{xdb}->reopen unless $new; $req->{Q} && !$req->{srch}->{qp_extra_done} and srch_init_extra $req; my $timeo = $req->{K}; alarm($timeo) if $timeo; $fn->($req, @argv); alarm(0) if $timeo; } sub recv_loop { local $SIG{__WARN__} = sub { print $stderr @_ }; my $rbuf; local $SIG{TERM} = sub { undef $in }; local $SIG{USR1} = \&reopen_logs; while (defined($in)) { PublicInbox::DS::sig_setmask($workerset); my @fds = eval { # we undef $in in SIG{TERM} $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33) }; if ($@) { exit if !$in; # hit by SIGTERM die; } scalar(@fds) or exit(66); # EX_NOINPUT die "recvmsg: $!" if !defined($fds[0]); PublicInbox::DS::block_signals(POSIX::SIGALRM); my $req = bless {}, __PACKAGE__; my $i = 0; open($req->{$i++}, '+<&=', $_) for @fds; local $stderr = $req->{1} // \*STDERR; die "not NUL-terminated" if chop($rbuf) ne "\0"; my @argv = split(/\0/, $rbuf); $req->{nr_out} = 0; $req->dispatch(@argv) if @argv; } } sub reap_worker { # awaitpid CB my ($pid, $nr) = @_; delete $WORKERS{$nr}; if (($? >> 8) == 66) { # EX_NOINPUT undef $in; } elsif ($?) { warn "worker[$nr] died \$?=$?\n"; } PublicInbox::DS::requeue(\&start_workers) if $in; } sub start_worker ($) { my ($nr) = @_; my $pid = eval { PublicInbox::DS::fork_persist } // return(warn($@)); if ($pid == 0) { undef %WORKERS; $SIG{TTIN} = $SIG{TTOU} = 'IGNORE'; $SIG{CHLD} = 'DEFAULT'; # Xapian may use this recv_loop(); exit(0); } else { $WORKERS{$nr} = $pid; awaitpid($pid, \&reap_worker, $nr); } } sub start_workers { for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) { start_worker($nr) if $in; } } sub do_sigttou { if ($in && $nworker > 1) { --$nworker; my @nr = grep { $_ >= $nworker } keys %WORKERS; kill('TERM', @WORKERS{@nr}); } } sub reopen_logs { my $p = $ENV{STDOUT_PATH}; defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1); $p = $ENV{STDERR_PATH}; defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1); } sub parent_reopen_logs { reopen_logs(); kill('USR1', values %WORKERS); } sub xh_alive { $in || scalar(keys %WORKERS) } sub start (@) { my (@argv) = @_; my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE); unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET'; local (%SRCH, %WORKERS); PublicInbox::Search::load_xapian(); $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or die 'bad args'; local $workerset = POSIX::SigSet->new; $workerset->fillset or die "fillset: $!"; for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) { $workerset->delset($_) or die "delset($_): $!"; } local $nworker = $opt->{j}; return recv_loop() if $nworker == 0; die '-j must be >= 0' if $nworker < 0; for (POSIX::SIGTERM, POSIX::SIGCHLD) { $workerset->delset($_) or die "delset($_): $!"; } my $sig = { TTIN => sub { if ($in) { ++$nworker; PublicInbox::DS::requeue(\&start_workers) } }, TTOU => \&do_sigttou, CHLD => \&PublicInbox::DS::enqueue_reap, USR1 => \&parent_reopen_logs, }; PublicInbox::DS::block_signals(); start_workers(); @PublicInbox::DS::post_loop_do = \&xh_alive; PublicInbox::DS::event_loop($sig); } 1;