diff options
36 files changed, 639 insertions, 262 deletions
diff --git a/Documentation/RelNotes/v2.0.0.wip b/Documentation/RelNotes/v2.0.0.wip index 4d872fd7..794d7956 100644 --- a/Documentation/RelNotes/v2.0.0.wip +++ b/Documentation/RelNotes/v2.0.0.wip @@ -54,8 +54,9 @@ treewide * SHA-256 coderepos are fully supported (but not inboxes, yet) - * jemalloc (tested as an LD_PRELOAD) is recommended to reduce fragmentation - in long-running daemon processes serving unpredictable traffic + * for daemons serving public traffic, MALLOC_MMAP_THRESHOLD_=131072 is + recommended to reduce fragmentation in glibc malloc, while jemalloc + (tested as an LD_PRELOAD) is another option. PublicInbox::WWW diff --git a/Documentation/public-inbox-daemon.pod b/Documentation/public-inbox-daemon.pod index 6f1e3b53..092be667 100644 --- a/Documentation/public-inbox-daemon.pod +++ b/Documentation/public-inbox-daemon.pod @@ -79,9 +79,9 @@ C<err=> may also be specified on a per-listener basis. Default: /dev/null with C<--daemonize>, inherited otherwise -=item -W +=item -W INTEGER -=item --worker-processes +=item --worker-processes INTEGER Set the number of worker processes. @@ -96,6 +96,40 @@ the master on crashes. Default: 1 +=item -X INTEGER + +=item --xapian-helpers INTEGER + +Enables the use of Xapian helper processes to handle expensive, +non-deterministic Xapian search queries asynchronously without +blocking simple requests. + +With positive values, there is an additional manager process +that can be signaled to control the number of Xapian helper workers. + +* C<-X0> one worker, no manager process +* C<-X1> one worker, one manager process +... +* C<-X8> eight workers, one manager process + +As with the public-facing public-inbox-* daemons, sending C<SIGTTIN> +or C<SIGTTOU> to the Xapian helper manager process will increment or +decrement the number of workers. + +Both Xapian helper workers and managers automatically respawn if they +crash or are explicitly killed, even with C<-X0>. + +A C++ compiler, L<pkg-config(1)>, and Xapian development files (e.g. +C<libxapian-dev> or C<xapian*-core-dev*>) are required to gain access to +some expensive queries and significant memory savings. + +Xapian helper workers are shared by all C<--worker-processes> of the +Perl daemon for additional memory savings. + +New in public-inbox 2.0.0. + +Default: undefined, search queries are handled synchronously + =item --cert /path/to/cert The default TLS certificate for HTTPS, IMAPS, NNTPS, POP3S and/or STARTTLS diff --git a/Documentation/public-inbox-extindex.pod b/Documentation/public-inbox-extindex.pod index b53e45ed..2db7d7e9 100644 --- a/Documentation/public-inbox-extindex.pod +++ b/Documentation/public-inbox-extindex.pod @@ -80,6 +80,19 @@ doubles the size of the already-large Xapian database. Used with C<--reindex>, it will only look for new and stale entries and not touch already-indexed messages. +=item --no-multi-pack-index + +Disable writing a L<git-multi-pack-index(1)> file to save memory. +Normally, enabling multi-pack-index speeds up startup time of +subsequent L<git-cat-file(1)> processes by 3-4%, but generating +this file requires several GB of memory with large repos. + +Unlike the C<core.multiPackIndex> directive in git, it's still +possible to read existing multi-pack-index files if they are +created elsewhere. + +Available in public-inbox 2.0.0+ + =back =head1 FILES diff --git a/Documentation/public-inbox-index.pod b/Documentation/public-inbox-index.pod index 14f157a5..f1a2180a 100644 --- a/Documentation/public-inbox-index.pod +++ b/Documentation/public-inbox-index.pod @@ -192,6 +192,13 @@ external indices are configured. Do not update the C<all> external index by default. This negates all uses of C<-E> / C<--update-extindex=> on the command-line. +=item --no-multi-pack-index + +Disables writing the multi-pack-index when using L</--update-extindex>. +See L<public-inbox-extindex(1)/--no-multi-pack-index> for details. + +Available in public-inbox 2.0.0+ + =item --since=DATESTRING =item --after=DATESTRING diff --git a/Documentation/public-inbox-tuning.pod b/Documentation/public-inbox-tuning.pod index 7d0690b4..892ee0f2 100644 --- a/Documentation/public-inbox-tuning.pod +++ b/Documentation/public-inbox-tuning.pod @@ -165,11 +165,10 @@ capacity planning. Bursts of small object allocations late in process life contribute to fragmentation of the heap due to arenas (slabs) used internally by Perl. -jemalloc (tested as an LD_PRELOAD on GNU/Linux) reduces -overall fragmentation compared to glibc malloc in long-lived processes. -glibc malloc users may try setting C<MALLOC_MMAP_THRESHOLD_> to a lower -value (e.g. 131072) but that may require increasing the -C<sys.vm.max_map_count> sysctl. +glibc malloc users should use C<MALLOC_MMAP_THRESHOLD_=131072> to reduce +fragmentation from the sliding mmap window. jemalloc (tested as an +LD_PRELOAD on GNU/Linux) also reduces fragmentation compared to an +unconfigured glibc malloc in long-lived processes. =head2 Other OS tuning knobs @@ -382,6 +382,8 @@ lib/PublicInbox/XapClient.pm lib/PublicInbox/XapHelper.pm lib/PublicInbox/XapHelperCxx.pm lib/PublicInbox/Xapcmd.pm +lib/PublicInbox/XhcMset.pm +lib/PublicInbox/XhcMsetIterator.pm lib/PublicInbox/gcf2_libgit2.h lib/PublicInbox/xap_helper.h lib/PublicInbox/xh_cidx.h diff --git a/Makefile.PL b/Makefile.PL index 2b2e6b18..27fe02ff 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -255,6 +255,12 @@ check-run : check-man # GNU and *BSD both allow it. check-run_T_ARGS = -j\$(N) +check-xh0 : + \$(MAKE) check-run TEST_DAEMON_XH='-X0' + +check-xh1 : + \$(MAKE) check-run TEST_DAEMON_XH='-X1' + check-debris check-run : pure_all \$(EATMYDATA) \$(PROVE) -bvw xt/\$@.t :: \$(\$\@_T_ARGS) -@\$(check_manifest) diff --git a/examples/public-inbox-httpd@.service b/examples/public-inbox-httpd@.service index 11859198..ca68fc7e 100644 --- a/examples/public-inbox-httpd@.service +++ b/examples/public-inbox-httpd@.service @@ -19,6 +19,7 @@ After = public-inbox-httpd.socket Environment = PI_CONFIG=/home/pi/.public-inbox/config \ PATH=/usr/local/bin:/usr/bin:/bin \ TZ=UTC \ +MALLOC_MMAP_THRESHOLD_=131072 \ PERL_INLINE_DIRECTORY=/tmp/.pub-inline LimitNOFILE = 30000 diff --git a/examples/public-inbox-imapd@.service b/examples/public-inbox-imapd@.service index 80104605..1aede65d 100644 --- a/examples/public-inbox-imapd@.service +++ b/examples/public-inbox-imapd@.service @@ -16,6 +16,8 @@ After = public-inbox-imapd.socket [Service] Environment = PI_CONFIG=/home/pi/.public-inbox/config \ PATH=/usr/local/bin:/usr/bin:/bin \ +TZ=UTC \ +MALLOC_MMAP_THRESHOLD_=131072 \ PERL_INLINE_DIRECTORY=/tmp/.pub-inline LimitNOFILE = 30000 diff --git a/examples/public-inbox-netd@.service b/examples/public-inbox-netd@.service index 7437f086..51f58fbb 100644 --- a/examples/public-inbox-netd@.service +++ b/examples/public-inbox-netd@.service @@ -12,14 +12,15 @@ Wants = public-inbox-netd.socket After = public-inbox-netd.socket [Service] -# An LD_PRELOAD for libjemalloc can be added here. It is -# more resistant to fragmentation in long-lived daemons than glibc. -# If you're unable to use jemalloc, setting MALLOC_MMAP_THRESHOLD_ -# to a lower value (e.g. 131072) but that may also require increasing -# the sys.vm.max_map_count sysctl. + +# Setting MALLOC_MMAP_THRESHOLD_=131072 reduces fragmentation by +# disabling the sliding mmap window in glibc malloc. An LD_PRELOAD for +# libjemalloc may be added here, instead. jemalloc is more resistant to +# fragmentation in long-lived daemons than unconfigured glibc malloc. Environment = PI_CONFIG=/home/pi/.public-inbox/config \ PATH=/usr/local/bin:/usr/bin:/bin \ TZ=UTC \ +MALLOC_MMAP_THRESHOLD_=131072 \ PERL_INLINE_DIRECTORY=/tmp/.netd-inline LimitNOFILE = 30000 diff --git a/examples/public-inbox-nntpd@.service b/examples/public-inbox-nntpd@.service index 24f9ca73..556cb76f 100644 --- a/examples/public-inbox-nntpd@.service +++ b/examples/public-inbox-nntpd@.service @@ -16,6 +16,8 @@ After = public-inbox-nntpd.socket [Service] Environment = PI_CONFIG=/home/pi/.public-inbox/config \ PATH=/usr/local/bin:/usr/bin:/bin \ +TZ=UTC \ +MALLOC_MMAP_THRESHOLD_=131072 \ PERL_INLINE_DIRECTORY=/tmp/.pub-inline LimitNOFILE = 30000 diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index 2f102ec6..fc77bd03 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -11,8 +11,8 @@ use Socket qw(SOL_SOCKET SCM_RIGHTS); sub sendmsg_retry ($) { return 1 if $!{EINTR}; return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS}); - return if ++$_[0] >= 50; - warn "# sleeping on sendmsg: $! (#$_[0])\n"; + return if --$_[0] < 0; + warn "# sleeping on sendmsg: $! ($_[0] tries left)\n"; select(undef, undef, undef, 0.1); 1; } @@ -22,15 +22,15 @@ require Socket::MsgHdr; # XS no warnings 'once'; # any number of FDs per-sendmsg(2) + buffer -*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_; - my ($sock, $fds, undef, $flags) = @_; +*send_cmd4 = sub ($$$$;$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags, $tries) = @_; + $tries //= 50; my $mh = Socket::MsgHdr->new(buf => $_[2]); $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('i' x scalar(@$fds), @$fds)); my $s; - my $try = 0; do { $s = Socket::MsgHdr::sendmsg($sock, $mh, $flags); - } while (!defined($s) && sendmsg_retry($try)); + } while (!defined($s) && sendmsg_retry($tries)); $s; }; diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index ec76d6b8..28458b19 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -22,9 +22,11 @@ use PublicInbox::GitAsyncCat; use PublicInbox::Eml; use PublicInbox::Config; use PublicInbox::OnDestroy; +use PublicInbox::Search; +use PublicInbox::XapClient; our $SO_ACCEPTFILTER = 0x1000; my @CMD; -my ($set_user, $oldset); +my ($set_user, $oldset, $xh_workers); my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my ($nworker, @listeners, %WORKERS, %logs); my %tls_opt; # scheme://sockname => args for IO::Socket::SSL::SSL_Context->new @@ -170,6 +172,7 @@ options: --cert=FILE default SSL/TLS certificate --key=FILE default SSL/TLS certificate key -W WORKERS number of worker processes to spawn (default: 1) + -X XWORKERS number of Xapian helper processes (default: undefined) See public-inbox-daemon(8) and $prog(1) man pages for more. EOF @@ -185,6 +188,7 @@ EOF 'multi-accept=i' => \$PublicInbox::Listener::MULTI_ACCEPT, 'cert=s' => \$default_cert, 'key=s' => \$default_key, + 'X|xapian-helpers=i' => \$xh_workers, 'help|h' => \(my $show_help), ); GetOptions(%opt) or die $help; @@ -384,10 +388,30 @@ sub worker_quit { # $_[0] = signal name or number (unused) @PublicInbox::DS::post_loop_do = (\&has_busy_clients, { -w => 0 }) } +sub spawn_xh () { + $xh_workers // return; + require PublicInbox::XhcMset; + local $) = $gid if defined $gid; + local $( = $gid if defined $gid; + local $> = $uid if defined $uid; + local $< = $uid if defined $uid; + $PublicInbox::Search::XHC = eval { + local $ENV{STDERR_PATH} = $stderr; + local $ENV{STDOUT_PATH} = $stdout; + PublicInbox::XapClient::start_helper('-j', $xh_workers) + }; + warn "E: $@" if $@; + awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh) + if $PublicInbox::Search::XHC; +} + sub reopen_logs { + my ($sig) = @_; $logs{$stdout} //= \*STDOUT if defined $stdout; $logs{$stderr} //= \*STDERR if defined $stderr; while (my ($p, $fh) = each %logs) { open_log_path($fh, $p) } + ($sig && defined($xh_workers) && $PublicInbox::Search::XHC) and + kill('USR1', $PublicInbox::Search::XHC->{io}->attached_pid); } sub sockname ($) { @@ -544,6 +568,7 @@ sub start_worker ($) { my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { undef %WORKERS; + undef $xh_workers; local $PublicInbox::DS::Poller; # allow epoll/kqueue $set_user->() if $set_user; PublicInbox::EOFpipe->new($parent_pipe, \&worker_quit); @@ -571,8 +596,9 @@ sub master_loop { pipe($parent_pipe, my $p1) or die "failed to create parent-pipe: $!"; my $set_workers = $nworker; # for SIGWINCH reopen_logs(); + spawn_xh; my $msig = { - USR1 => sub { reopen_logs(); kill_workers($_[0]); }, + USR1 => sub { reopen_logs($_[0]); kill_workers($_[0]); }, USR2 => \&upgrade, QUIT => \&master_quit, INT => \&master_quit, @@ -671,6 +697,7 @@ sub daemon_loop () { sub worker_loop { $uid = $gid = undef; reopen_logs(); + spawn_xh; # only for -W0 @listeners = map {; my $l = sockname($_); my $tls_cb = $POST_ACCEPT{$l}; @@ -687,6 +714,13 @@ sub worker_loop { PublicInbox::DS::event_loop(\%WORKER_SIG, $oldset); } +sub respawn_xh { # awaitpid cb + my ($pid) = @_; + return unless @listeners; + warn "W: xap_helper PID:$pid died: \$?=$?, respawning...\n"; + spawn_xh; +} + sub run { my ($default_listen) = @_; $nworker = 1; @@ -699,7 +733,8 @@ sub run { local $PublicInbox::Git::async_warn = 1; local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); local %WORKER_SIG = %WORKER_SIG; - local %POST_ACCEPT; + local $PublicInbox::XapClient::tries = 0; + local $PublicInbox::Search::XHC if defined($xh_workers); daemon_loop(); # $unlink_on_leave runs diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 763a124c..774fa47b 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1287,7 +1287,7 @@ sub idx_init { # similar to V2Writable ($has_new || $prune_nr || $new ne '') and $self->{mg}->write_alternates($mode, $alt, $new); my $restore = $self->with_umask; - if ($git_midx) { + if ($git_midx && ($opt->{'multi-pack-index'} // 1)) { my @cmd = ('multi-pack-index'); push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1; my $lk = $self->lock_for_scope; diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm index 62112171..20808d6d 100644 --- a/lib/PublicInbox/Isearch.pm +++ b/lib/PublicInbox/Isearch.pm @@ -26,34 +26,44 @@ SELECT ibx_id FROM inboxes WHERE eidx_key = ? LIMIT 1 sub query_approxidate { $_[0]->{es}->query_approxidate($_[1], $_[2]) } -sub mset { - my ($self, $str, $opt) = @_; +sub eidx_mset_prep ($$) { + my ($self, $opt) = @_; my %opt = $opt ? %$opt : (); $opt{eidx_key} = $self->{eidx_key}; - if (my $uid_range = $opt{uid_range}) { - my ($beg, $end) = @$uid_range; - my $ibx_id = $self->{-ibx_id} //= _ibx_id($self); - my $dbh = $self->{es}->over->dbh; - my $sth = $dbh->prepare_cached(<<'', undef, 1); + my $uid_range = $opt{uid_range} or return \%opt; + my ($beg, $end) = @$uid_range; + my $ibx_id = $self->{-ibx_id} //= _ibx_id($self); + my $dbh = $self->{es}->over->dbh; + my $sth = $dbh->prepare_cached(<<'', undef, 1); SELECT MIN(docid) FROM xref3 WHERE ibx_id = ? AND xnum >= ? AND xnum <= ? - $sth->execute($ibx_id, $beg, $end); - my @r = ($sth->fetchrow_array); + $sth->execute($ibx_id, $beg, $end); + my @r = ($sth->fetchrow_array); - $sth = $dbh->prepare_cached(<<'', undef, 1); + $sth = $dbh->prepare_cached(<<'', undef, 1); SELECT MAX(docid) FROM xref3 WHERE ibx_id = ? AND xnum >= ? AND xnum <= ? - $sth->execute($ibx_id, $beg, $end); - $r[1] = $sth->fetchrow_array; - if (defined($r[1]) && defined($r[0])) { - $opt{limit} = $r[1] - $r[0] + 1; - } else { - $r[1] //= $self->{es}->xdb->get_lastdocid; - $r[0] //= 0; - } - $opt{uid_range} = \@r; # these are fed to Xapian and SQLite + $sth->execute($ibx_id, $beg, $end); + $r[1] = $sth->fetchrow_array; + if (defined($r[1]) && defined($r[0])) { + $opt{limit} = $r[1] - $r[0] + 1; + } else { + $r[1] //= $self->{es}->xdb->get_lastdocid; + $r[0] //= 0; } - $self->{es}->mset($str, \%opt); + $opt{uid_range} = \@r; # these are fed to Xapian and SQLite + \%opt; +} + +sub mset { + my ($self, $str, $opt) = @_; + $self->{es}->mset($str, eidx_mset_prep $self, $opt); +} + +sub async_mset { + my ($self, $str, $opt, $cb, @args) = @_; + $opt = eidx_mset_prep $self, $opt; + $self->{es}->async_mset($str, $opt, $cb, @args); } sub mset_to_artnums { diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm index 52f88ae3..17893a09 100644 --- a/lib/PublicInbox/Mbox.pm +++ b/lib/PublicInbox/Mbox.pm @@ -4,7 +4,7 @@ # Streaming interface for mboxrd HTTP responses # See PublicInbox::GzipFilter for details. package PublicInbox::Mbox; -use strict; +use v5.12; use parent 'PublicInbox::GzipFilter'; use PublicInbox::MID qw/mid_escape/; use PublicInbox::Hval qw/to_filename/; @@ -31,8 +31,8 @@ sub async_next { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; # client aborted eval { - my $smsg = $ctx->{smsg} or return $ctx->close; - $ctx->smsg_blob($smsg); + my $smsg = $ctx->{smsg} // return $ctx->close; + $ctx->smsg_blob($smsg) if $smsg; }; warn "E: $@" if $@; } @@ -159,6 +159,7 @@ sub all_ids_cb { } $ctx->{ids} = $ids = $over->ids_after(\($ctx->{prev})); } while (@$ids); + undef; } sub mbox_all_ids { @@ -175,56 +176,79 @@ sub mbox_all_ids { PublicInbox::MboxGz::mbox_gz($ctx, \&all_ids_cb, 'all'); } -sub results_cb { - my ($ctx) = @_; - my $over = $ctx->{ibx}->over or return $ctx->gone('over'); - while (1) { - while (defined(my $num = shift(@{$ctx->{ids}}))) { - my $smsg = $over->get_art($num) or next; - return $smsg; - } - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; +my $refill_ids_cb = sub { # async_mset cb + my ($ctx, $http, $mset, $err) = @_; + $http = undef unless $ctx->{-really_async}; + if ($err) { + warn "E: $err"; + $ctx->close if $http; # our async httpd + return; } -} - -sub results_thread_cb { - my ($ctx) = @_; + # refill result set, deprioritize since there's many results + my $size = $mset->size or do { + $ctx->close if $http; + $ctx->{-mbox_done} = 1; + return; + }; + $ctx->{qopts}->{offset} += $size; + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + $ctx->{-low_prio} = 1; # true + return if !$http; + eval { + my $smsg = results_cb($ctx) // return $ctx->close; + return if !$smsg; # '' wait for async_mset + $ctx->smsg_blob($ctx->{smsg} = $smsg); + }; + warn "E: $@" if $@; +}; +sub results_cb { # async_next or MboxGz->getline cb + my ($ctx, $http) = @_; my $over = $ctx->{ibx}->over or return $ctx->gone('over'); while (1) { - while (defined(my $num = shift(@{$ctx->{xids}}))) { + my $ids = $ctx->{xids} // $ctx->{ids}; + while (defined(my $num = shift(@$ids))) { my $smsg = $over->get_art($num) or next; return $smsg; } - - # refills ctx->{xids} - next if $over->expand_thread($ctx); - - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; + next if $ctx->{xids} && $over->expand_thread($ctx); + return '' if $ctx->{srch}->async_mset(@$ctx{qw(query qopts)}, + $refill_ids_cb, $ctx, $http); + return if $ctx->{-mbox_done}; } +} +sub mbox_qry_cb { # async_mset cb + my ($ctx, $q, $mset, $err) = @_; + my $wcb = delete $ctx->{wcb}; + if ($err) { + warn "E: $err"; + return $wcb->([500, [qw(Content-Type text/plain)], + [ "Internal server error\n" ]]) + } + $ctx->{qopts}->{offset} = $mset->size or + return $wcb->([404, [qw(Content-Type text/plain)], + ["No results found\n"]]); + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + my $fn; + if ($q->{t} && $ctx->{srch}->has_threadid) { + $ctx->{xids} = []; # triggers over->expand_thread + $fn = "results-thread-$ctx->{query}"; + } else { + $fn = "results-$ctx->{query}"; + } + require PublicInbox::MboxGz; + my $res = PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } sub mbox_all { my ($ctx, $q) = @_; - my $q_string = $q->{'q'}; - return mbox_all_ids($ctx) if $q_string !~ /\S/; - my $srch = $ctx->{ibx}->isrch or + my $qstr = $q->{'q'}; + return mbox_all_ids($ctx) if $qstr !~ /\S/; + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); - - my $qopts = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC + my $opt = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC # {threadid} limits results to a given thread # {threads} collapses results from messages in the same thread, @@ -234,25 +258,16 @@ sub mbox_all { $ctx->{ibx}->{isrch}->{es}->over : $ctx->{ibx}->over) or return PublicInbox::WWW::need($ctx, 'Overview'); - $qopts->{threadid} = $over->mid2tid($ctx->{mid}); - } - $qopts->{threads} = 1 if $q->{t}; - $srch->query_approxidate($ctx->{ibx}->git, $q_string); - my $mset = $srch->mset($q_string, $qopts); - $qopts->{offset} = $mset->size or - return [404, [qw(Content-Type text/plain)], - ["No results found\n"]]; - $ctx->{query} = $q_string; - $ctx->{ids} = $srch->mset_to_artnums($mset, $qopts); - require PublicInbox::MboxGz; - my $fn; - if ($q->{t} && $srch->has_threadid) { - $fn = 'results-thread-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_thread_cb, $fn); - } else { - $fn = 'results-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + $opt->{threadid} = $over->mid2tid($ctx->{mid}); } + $opt->{threads} = 1 if $q->{t}; + $srch->query_approxidate($ctx->{ibx}->git, $qstr); + $ctx->{query} = $qstr; + sub { # called by PSGI server + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qstr, $opt, \&mbox_qry_cb, $ctx, $q) and + $ctx->{-really_async} = 1; + }; } 1; diff --git a/lib/PublicInbox/MboxGz.pm b/lib/PublicInbox/MboxGz.pm index 533d2ff1..90e69c09 100644 --- a/lib/PublicInbox/MboxGz.pm +++ b/lib/PublicInbox/MboxGz.pm @@ -1,7 +1,7 @@ # Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> package PublicInbox::MboxGz; -use strict; +use v5.12; use parent 'PublicInbox::GzipFilter'; use PublicInbox::Eml; use PublicInbox::Hval qw/to_filename/; @@ -13,8 +13,8 @@ sub async_next ($) { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; eval { - $ctx->{smsg} = $ctx->{cb}->($ctx) or return $ctx->close; - $ctx->smsg_blob($ctx->{smsg}); + my $smsg = $ctx->{cb}->($ctx, $http) // return $ctx->close; + $smsg and $ctx->smsg_blob($ctx->{smsg} = $smsg); }; warn "E: $@" if $@; } diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 678c8c5d..fbdb48a3 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -11,6 +11,7 @@ our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms); use List::Util qw(max); use POSIX qw(strftime); use Carp (); +our $XHC = 0; # defined but false # values for searching, changing the numeric value breaks # compatibility with old indices (so don't change them it) @@ -56,7 +57,7 @@ use constant { }; use PublicInbox::Smsg; -use PublicInbox::Over; +eval { require PublicInbox::Over }; our $QP_FLAGS; our %X = map { $_ => 0 } qw(BoolWeight Database Enquire QueryParser Stem Query); our $Xap; # 'Xapian' or 'Search::Xapian' @@ -85,11 +86,9 @@ our @XH_SPEC = ( 'k=i', # sort column (like sort(1)) 'm=i', # maximum number of results 'o=i', # offset - 'p', # show percent 'r', # 1=relevance then column 't', # collapse threads 'A=s@', # prefixes - 'D', # emit docdata 'K=i', # timeout kill after i seconds 'O=s', # eidx_key 'T=i', # threadid @@ -429,6 +428,59 @@ sub mset { do_enquire($self, $qry, $opt, TS); } +sub xhc_start_maybe (@) { + require PublicInbox::XapClient; + my $xhc = PublicInbox::XapClient::start_helper(@_); + require PublicInbox::XhcMset if $xhc; + $xhc; +} + +sub xh_opt ($) { + my ($opt) = @_; + my $lim = $opt->{limit} || 50; + my @ret; + push @ret, '-o', $opt->{offset} if $opt->{offset}; + push @ret, '-m', $lim; + my $rel = $opt->{relevance} // 0; + if ($rel == -2) { # ORDER BY docid/UID (highest first) + push @ret, '-k', '-1'; + } elsif ($rel == -1) { # ORDER BY docid/UID (lowest first) + push @ret, '-k', '-1'; + push @ret, '-a'; + } elsif ($rel == 0) { + push @ret, '-k', $opt->{sort_col} // TS; + push @ret, '-a' if $opt->{asc}; + } else { # rel > 0 + push @ret, '-r'; + push @ret, '-k', $opt->{sort_col} // TS; + push @ret, '-a' if $opt->{asc}; + } + push @ret, '-t' if $opt->{threads}; + push @ret, '-T', $opt->{threadid} if defined $opt->{threadid}; + push @ret, '-O', $opt->{eidx_key} if defined $opt->{eidx_key}; + @ret; +} + +# returns a true value if actually handled asynchronously, +# and a falsy value if handled synchronously +sub async_mset { + my ($self, $qry_str, $opt, $cb, @args) = @_; + if ($XHC) { # unconditionally retrieving pct + rank for now + xdb($self); # populate {nshards} + my @margs = ($self->xh_args, xh_opt($opt)); + my $ret = eval { + my $rd = $XHC->mkreq(undef, 'mset', @margs, $qry_str); + PublicInbox::XhcMset->maybe_new($rd, $self, $cb, @args); + }; + $cb->(@args, undef, $@) if $@; + $ret; + } else { # synchronous + my $mset = $self->mset($qry_str, $opt); + $cb->(@args, $mset); + undef; + } +} + sub do_enquire { # shared with CodeSearch my ($self, $qry, $opt, $col) = @_; my $enq = $X{Enquire}->new(xdb($self)); diff --git a/lib/PublicInbox/SearchView.pm b/lib/PublicInbox/SearchView.pm index 2d3e942c..9919e25c 100644 --- a/lib/PublicInbox/SearchView.pm +++ b/lib/PublicInbox/SearchView.pm @@ -30,10 +30,9 @@ sub mbox_results { sub sres_top_html { my ($ctx) = @_; - my $srch = $ctx->{ibx}->isrch or + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); my $q = PublicInbox::SearchQuery->new($ctx->{qp}); - my $x = $q->{x}; my $o = $q->{o} // 0; my $asc; if ($o < 0) { @@ -41,48 +40,57 @@ sub sres_top_html { $o = -($o + 1); # so [-1] is the last element, like Perl lists } - my $code = 200; # double the limit for expanded views: - my $opts = { + my $opt = { limit => $q->{l}, offset => $o, relevance => $q->{r}, threads => $q->{t}, asc => $asc, }; - my ($mset, $total, $err, $html); -retry: - eval { - my $query = $q->{'q'}; - $srch->query_approxidate($ctx->{ibx}->git, $query); - $mset = $srch->mset($query, $opts); - $total = $mset->get_matches_estimated; - }; - $err = $@; + my $qs = $q->{'q'}; + $srch->query_approxidate($ctx->{ibx}->git, $qs); + sub { + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qs, $opt, \&sres_html_cb, $ctx, $opt, $q); + } +} + +sub sres_html_cb { # async_mset cb + my ($ctx, $opt, $q, $mset, $err) = @_; + my $code = 200; + my $total = $mset ? $mset->get_matches_estimated : undef; ctx_prepare($q, $ctx); + my ($res, $html); if ($err) { $code = 400; $html = '<pre>'.err_txt($ctx, $err).'</pre><hr>'; } elsif ($total == 0) { - if (defined($ctx->{-uxs_retried})) { - # undo retry damage: + if (defined($ctx->{-uxs_retried})) { # undo retry damage: $q->{'q'} = $ctx->{-uxs_retried}; - } elsif (index($q->{'q'}, '%') >= 0) { + } elsif (index($q->{'q'}, '%') >= 0) { # retry unescaped $ctx->{-uxs_retried} = $q->{'q'}; - $q->{'q'} = uri_unescape($q->{'q'}); - goto retry; + my $qs = $q->{'q'} = uri_unescape($q->{'q'}); + $ctx->{srch}->query_approxidate($ctx->{ibx}->git, $qs); + return $ctx->{srch}->async_mset($qs, $opt, + \&sres_html_cb, $ctx, $opt, $q); } $code = 404; $html = "<pre>\n[No results found]</pre><hr>"; + } elsif ($q->{x} eq 'A') { + $res = adump($mset, $q, $ctx); } else { - return adump($_[0], $mset, $q, $ctx) if $x eq 'A'; - $ctx->{-html_tip} = search_nav_top($mset, $q, $ctx); - return mset_thread($ctx, $mset, $q) if $x eq 't'; - mset_summary($ctx, $mset, $q); # appends to {-html_tip} - $html = ''; + if ($q->{x} eq 't') { + $res = mset_thread($ctx, $mset, $q); + } else { + mset_summary($ctx, $mset, $q); # appends to {-html_tip} + $html = ''; + } } - html_oneshot($ctx, $code, $html); + $res //= html_oneshot($ctx, $code, $html); + my $wcb = delete $ctx->{wcb}; + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } # display non-nested search results similar to what users expect from @@ -357,7 +365,7 @@ sub ctx_prepare { } sub adump { - my ($cb, $mset, $q, $ctx) = @_; + my ($mset, $q, $ctx) = @_; $ctx->{ids} = $ctx->{ibx}->isrch->mset_to_artnums($mset); $ctx->{search_query} = $q; # used by WwwAtomStream::atom_header PublicInbox::WwwAtomStream->response($ctx, \&adump_i); diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index e36659ce..e9e81e88 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -176,15 +176,15 @@ out: return (int)pid; } -static int sendmsg_retry(unsigned *tries) +static int sendmsg_retry(int *tries) { const struct timespec req = { 0, 100000000 }; /* 100ms */ int err = errno; switch (err) { case EINTR: PERL_ASYNC_CHECK(); return 1; case ENOBUFS: case ENOMEM: case ETOOMANYREFS: - if (++*tries >= 50) return 0; - fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n", + if (--*tries < 0) return 0; + fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n", strerror(err), *tries); nanosleep(&req, NULL); PERL_ASYNC_CHECK(); @@ -201,7 +201,7 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) +SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries) { struct msghdr msg = { 0 }; union my_cmsg cmsg = { 0 }; @@ -211,7 +211,6 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) AV *fds = (AV *)SvRV(svfds); I32 i, nfds = av_len(fds) + 1; int *fdp; - unsigned tries = 0; if (SvOK(data)) { iov.iov_base = SvPV(data, dlen); @@ -332,6 +331,9 @@ EOM if (defined $all_libc) { # set for Gcf2 $ENV{PERL_INLINE_DIRECTORY} = $inline_dir; %RLIMITS = rlimit_map(); + *send_cmd4 = sub ($$$$;$) { + send_cmd4_($_[0], $_[1], $_[2], $_[3], 50); + } } else { require PublicInbox::SpawnPP; *pi_fork_exec = \&PublicInbox::SpawnPP::pi_fork_exec diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 99af5bf5..4cbe9623 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -467,8 +467,8 @@ if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) { no warnings 'once'; require PublicInbox::CmdIPC4; -*send_cmd4 = sub ($$$$) { - my ($sock, $fds, undef, $flags) = @_; +*send_cmd4 = sub ($$$$;$) { + my ($sock, $fds, undef, $flags, $tries) = @_; my $iov = pack('P'.TMPL_size_t, $_[2] // NUL, length($_[2] // NUL) || 1); my $fd_space = scalar(@$fds) * SIZEOF_int; @@ -487,10 +487,10 @@ require PublicInbox::CmdIPC4; $msg_controllen, 0); # msg_flags my $s; - my $try = 0; + $tries //= 50; do { $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags); - } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try)); + } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($tries)); $s >= 0 ? $s : undef; }; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index a7ec9b5b..aeff5d1d 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -17,9 +17,10 @@ my $lei_loud = $ENV{TEST_LEI_ERR_LOUD}; our $tail_cmd = $ENV{TAIL}; our ($lei_opt, $lei_out, $lei_err); use autodie qw(chdir close fcntl mkdir open opendir seek unlink); +$ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds $_ = File::Spec->rel2abs($_) for (grep(!m!^/!, @INC)); - +our $CURRENT_DAEMON; BEGIN { @EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods run_script start_script key2sub xsys xsys_e xqx eml_load tick @@ -565,6 +566,9 @@ sub start_script { my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 2; my $sub = $run_mode == 0 ? undef : key2sub($key); my $tail; + my @xh = split(/\s+/, $ENV{TEST_DAEMON_XH} // ''); + @xh = () if $key !~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/; + push @argv, @xh; if ($tail_cmd) { my @paths; for (@argv) { @@ -612,7 +616,7 @@ sub start_script { $ENV{LISTEN_FDS} = $fds; } if ($opt->{-C}) { chdir($opt->{-C}) } - $0 = join(' ', @$cmd); + $0 = join(' ', @$cmd, @xh); local @SIG{keys %SIG} = map { undef } values %SIG; local $SIG{FPE} = 'IGNORE'; # Perl default undef $tmp_mask; @@ -720,7 +724,10 @@ SKIP: { require PublicInbox::Spawn; require PublicInbox::Config; require File::Path; - + eval { # use XDG_CACHE_HOME, first: + require PublicInbox::XapHelperCxx; + PublicInbox::XapHelperCxx::check_build(); + }; local %ENV = %ENV; delete $ENV{XDG_DATA_HOME}; delete $ENV{XDG_CONFIG_HOME}; @@ -945,6 +952,7 @@ sub test_httpd ($$;$$) { local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p"; my $ua = LWP::UserAgent->new; $ua->max_redirect(0); + local $CURRENT_DAEMON = $td; Plack::Test::ExternalServer::test_psgi(client => $client, ua => $ua); $cb->() if $cb; diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index 98034130..24b3f45e 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -12,6 +12,7 @@ use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC; use autodie qw(pipe socketpair); +our $tries = 50; sub mkreq { my ($self, $ios, @arg) = @_; @@ -19,13 +20,14 @@ sub mkreq { pipe($r, $ios->[0]) if !defined($ios->[0]); my @fds = map fileno($_), @$ios; my $buf = join("\0", @arg, ''); - $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) // - die "send_cmd: $!"; + $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0, $tries) + // die "send_cmd: $!"; $n == length($buf) or die "send_cmd: $n != ".length($buf); $r; } -sub start_helper { +sub start_helper (@) { + $PublicInbox::IPC::send_cmd or return; # can't work w/o SCM_RIGHTS my @argv = @_; socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0); my $cls = 'PublicInbox::XapHelperCxx'; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index 8c7732f5..2e20660e 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -27,6 +27,8 @@ sub cmd_test_inspect { ($req->{srch}->has_threadid ? 1 : 0) } +sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 } + sub iter_retry_check ($) { if (ref($@) =~ /\bDatabaseModifiedError\b/) { $_[0]->{srch}->reopen; @@ -147,17 +149,8 @@ sub cmd_dump_roots { sub mset_iter ($$) { my ($req, $it) = @_; - eval { - my $buf = $it->get_docid; - $buf .= "\0".$it->get_percent if $req->{p}; - my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef; - for my $p (@{$req->{A}}) { - $buf .= "\0".$p.$_ for xap_terms($p, $doc); - } - $buf .= "\0".$doc->get_data if $req->{D}; - say { $req->{0} } $buf; - }; - $@ ? iter_retry_check($req) : 0; + say { $req->{0} } $it->get_docid, "\0", + $it->get_percent, "\0", $it->get_rank; } sub cmd_mset { # to be used by WWW + IMAP @@ -170,7 +163,8 @@ sub cmd_mset { # to be used by WWW + IMAP $opt->{eidx_key} = $req->{O} if defined $req->{O}; $opt->{threadid} = $req->{T} if defined $req->{T}; my $mset = $req->{srch}->mset($qry_str, $opt); - say { $req->{0} } 'mset.size=', $mset->size; + say { $req->{0} } 'mset.size=', $mset->size, + ' .get_matches_estimated=', $mset->get_matches_estimated; for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { $t = mset_iter($req, $it) // $t; @@ -201,13 +195,17 @@ sub dispatch { $new->{qp} = $new->qparse_new; $new; }; + my $timeo = $req->{K}; + alarm($timeo) if $timeo; $fn->($req, @argv); + alarm(0) if $timeo; } sub recv_loop { local $SIG{__WARN__} = sub { print $stderr @_ }; my $rbuf; local $SIG{TERM} = sub { undef $in }; + local $SIG{USR1} = \&reopen_logs; while (defined($in)) { PublicInbox::DS::sig_setmask($workerset); my @fds = eval { # we undef $in in SIG{TERM} @@ -219,7 +217,7 @@ sub recv_loop { } scalar(@fds) or exit(66); # EX_NOINPUT die "recvmsg: $!" if !defined($fds[0]); - PublicInbox::DS::block_signals(); + PublicInbox::DS::block_signals(POSIX::SIGALRM); my $req = bless {}, __PACKAGE__; my $i = 0; open($req->{$i++}, '+<&=', $_) for @fds; @@ -271,6 +269,18 @@ sub do_sigttou { } } +sub reopen_logs { + my $p = $ENV{STDOUT_PATH}; + defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1); + $p = $ENV{STDERR_PATH}; + defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1); +} + +sub parent_reopen_logs { + reopen_logs(); + kill('USR1', values %WORKERS); +} + sub xh_alive { $in || scalar(keys %WORKERS) } sub start (@) { @@ -284,7 +294,7 @@ sub start (@) { die 'bad args'; local $workerset = POSIX::SigSet->new; $workerset->fillset or die "fillset: $!"; - for (@PublicInbox::DS::UNBLOCKABLE) { + for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) { $workerset->delset($_) or die "delset($_): $!"; } @@ -303,6 +313,7 @@ sub start (@) { }, TTOU => \&do_sigttou, CHLD => \&PublicInbox::DS::enqueue_reap, + USR1 => \&parent_reopen_logs, }; PublicInbox::DS::block_signals(); start_workers(); diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm index eafe61a8..74852ad1 100644 --- a/lib/PublicInbox/XapHelperCxx.pm +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -16,8 +16,15 @@ use autodie; my $cxx = which($ENV{CXX} // 'c++') // which('clang') // die 'no C++ compiler'; my $dir = substr("$cxx-$Config{archname}", 1); # drop leading '/' $dir =~ tr!/!-!; -my $idir = ($ENV{XDG_CACHE_HOME} // - (($ENV{HOME} // die('HOME unset')).'/.cache')).'/public-inbox/jaot'; +my $idir; +if ((defined($ENV{XDG_CACHE_HOME}) && -d $ENV{XDG_CACHE_HOME}) || + (defined($ENV{HOME}) && -d $ENV{HOME})) { + $idir = ($ENV{XDG_CACHE_HOME} // + (($ENV{HOME} // die('HOME unset')).'/.cache') + ).'/public-inbox/jaot'; +} +$idir //= $ENV{PERL_INLINE_DIRECTORY} // + die 'HOME and PERL_INLINE_DIRECTORY unset'; substr($dir, 0, 0) = "$idir/"; my $bin = "$dir/xap_helper"; my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!); @@ -58,7 +65,11 @@ sub needs_rebuild () { sub build () { if (!-d $dir) { require File::Path; - File::Path::make_path($dir); + eval { File::Path::make_path($dir) }; + if (!-d $dir && defined($ENV{PERL_INLINE_DIRECTORY})) { + $dir = $ENV{PERL_INLINE_DIRECTORY}; + File::Path::make_path($dir); + } } require PublicInbox::CodeSearch; require PublicInbox::Lock; diff --git a/lib/PublicInbox/XhcMset.pm b/lib/PublicInbox/XhcMset.pm new file mode 100644 index 00000000..ac25eece --- /dev/null +++ b/lib/PublicInbox/XhcMset.pm @@ -0,0 +1,51 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# mocks Xapian::Mset and allows slow queries from blocking the event loop +package PublicInbox::XhcMset; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::XhcMsetIterator; +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); + +sub event_step { + my ($self) = @_; + my ($cb, @args) = @{delete $self->{cb_args} // return}; + my $rd = $self->{sock}; + eval { + my $hdr = <$rd> // die "E: reading mset header: $!"; + for (split /\s+/, $hdr) { # read mset.size + estimated_matches + my ($k, $v) = split /=/, $_, 2; + $k =~ s/\A[^\.]*\.//; # s/(mset)?\./ + $self->{$k} = $v; + } + my $size = $self->{size} // die "E: bad xhc header: `$hdr'"; + my @it = map { PublicInbox::XhcMsetIterator::make($_) } <$rd>; + $self->{items} = \@it; + scalar(@it) == $size or die + 'E: got ',scalar(@it),", expected mset.size=$size"; + }; + my $err = $@; + $self->close; + eval { $cb->(@args, $self, $err) }; + warn "E: $@\n" if $@; +} + +sub maybe_new { + my (undef, $rd, $srch, @cb_args) = @_; + my $self = bless { cb_args => \@cb_args, srch => $srch }, __PACKAGE__; + if ($PublicInbox::DS::in_loop) { # async + $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); + } else { # synchronous + $self->{sock} = $rd; + event_step($self); + undef; + } +} + +eval(join('', map { "sub $_ { \$_[0]->{$_} }\n" } qw(size + get_matches_estimated))); + +sub items { @{$_[0]->{items}} } + +1; diff --git a/lib/PublicInbox/XhcMsetIterator.pm b/lib/PublicInbox/XhcMsetIterator.pm new file mode 100644 index 00000000..dcfc61e4 --- /dev/null +++ b/lib/PublicInbox/XhcMsetIterator.pm @@ -0,0 +1,20 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# mocks Xapian::MsetIterator, there's many of these allocated at once +package PublicInbox::XhcMsetIterator; +use v5.12; + +sub make ($) { + chomp($_[0]); + my @self = map { $_ + 0 } split /\0/, $_[0]; # docid, pct, rank + # we don't store $xdb in self[4] since we avoid $it->get_document + # in favor of $xdb->get_document($it->get_docid) + bless \@self, __PACKAGE__; +} + +sub get_docid { $_[0]->[0] } +sub get_percent { $_[0]->[1] } +sub get_rank { $_[0]->[2] } + +1; diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 3456910b..3df3ce91 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -27,6 +27,7 @@ #include <sys/types.h> #include <sys/uio.h> #include <sys/wait.h> +#include <poll.h> #include <assert.h> #include <err.h> // BSD, glibc, and musl all have this @@ -95,6 +96,8 @@ static pid_t *worker_pids; // nr => pid #define WORKER_MAX USHRT_MAX static unsigned long nworker, nworker_hwm; static int pipefds[2]; +static const char *stdout_path, *stderr_path; // for SIGUSR1 +static sig_atomic_t worker_needs_reopen; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -141,8 +144,6 @@ struct req { // argv and pfxv point into global rbuf bool collapse_threads; bool code_search; bool relevance; // sort by relevance before column - bool emit_percent; - bool emit_docdata; bool asc; // ascending sort }; @@ -226,6 +227,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } + // TODO: uid_range + if (req->threadid != ULLONG_MAX) { + std::string tid = Xapian::sortable_serialise(req->threadid); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID, + tid, tid)); + } Xapian::Enquire enq = prep_enquire(req); enq.set_query(qry); // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm @@ -406,6 +414,11 @@ static bool cmd_test_inspect(struct req *req) return false; } +static bool cmd_test_sleep(struct req *req) +{ + for (;;) poll(NULL, 0, 10); + return false; +} #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff #include "xh_cidx.h" // CodeSearchIdx.pm stuff @@ -420,6 +433,7 @@ static const struct cmd_entry { CMD(dump_ibx), // many inboxes CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last + CMD(test_sleep), // least common commands last }; #define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) @@ -633,7 +647,6 @@ static void dispatch(struct req *req) if (*end || req->off == ULLONG_MAX) ABORT("-o %s", optarg); break; - case 'p': req->emit_percent = true; break; case 'r': req->relevance = true; break; case 't': req->collapse_threads = true; break; case 'A': @@ -641,7 +654,6 @@ static void dispatch(struct req *req) if (MY_ARG_MAX == req->pfxc) ABORT("too many -A"); break; - case 'D': req->emit_docdata = true; break; case 'K': req->timeout_sec = strtoul(optarg, &end, 10); if (*end || req->timeout_sec == ULONG_MAX) @@ -675,6 +687,9 @@ static void dispatch(struct req *req) free_srch(kbuf.srch); goto cmd_err; // srch_init already warned } + if (req->timeout_sec) + alarm(req->timeout_sec > UINT_MAX ? + UINT_MAX : (unsigned)req->timeout_sec); try { if (!req->fn(req)) warnx("`%s' failed", req->argv[0]); @@ -683,6 +698,8 @@ static void dispatch(struct req *req) } catch (...) { warn("unhandled exception"); } + if (req->timeout_sec) + alarm(0); cmd_err: return; // just be silent on errors, for now } @@ -723,9 +740,12 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } -static void sigw(int sig) // SIGTERM handler for worker +static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker { - sock_fd = -1; // break out of recv_loop + switch (sig) { + case SIGUSR1: worker_needs_reopen = 1; break; + default: sock_fd = -1; // break out of recv_loop + } } #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup))) @@ -735,6 +755,18 @@ static void req_cleanup(void *ptr) free(req->lenv); } +static void reopen_logs(void) +{ + if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout)) + err(EXIT_FAILURE, "freopen %s", stdout_path); + if (stderr_path && *stderr_path) { + if (!freopen(stderr_path, "a", stderr)) + err(EXIT_FAILURE, "freopen %s", stderr_path); + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + } +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process @@ -742,6 +774,7 @@ static void recv_loop(void) // worker process loop sa.sa_handler = sigw; CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); while (sock_fd == 0) { size_t len = sizeof(rbuf); @@ -758,6 +791,10 @@ static void recv_loop(void) // worker process loop stderr_restore(req.fp[1]); ERR_CLOSE(req.fp[1], 0); } + if (worker_needs_reopen) { + worker_needs_reopen = 0; + reopen_logs(); + } } } @@ -810,6 +847,16 @@ static void cleanup_all(void) #endif } +static void parent_reopen_logs(void) +{ + reopen_logs(); + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGUSR1)) + warn("BUG?: kill(%d, SIGUSR1)", (int)pid); + } +} + static void sigp(int sig) // parent signal handler { static const char eagain[] = "signals coming in too fast"; @@ -822,6 +869,7 @@ static void sigp(int sig) // parent signal handler case SIGCHLD: c = '.'; break; case SIGTTOU: c = '-'; break; case SIGTTIN: c = '+'; break; + case SIGUSR1: c = '#'; break; default: write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); _exit(EXIT_FAILURE); @@ -928,6 +976,8 @@ int main(int argc, char *argv[]) { int c; socklen_t slen = (socklen_t)sizeof(c); + stdout_path = getenv("STDOUT_PATH"); + stderr_path = getenv("STDERR_PATH"); if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); @@ -945,12 +995,6 @@ int main(int argc, char *argv[]) } nworker = 1; -#ifdef _SC_NPROCESSORS_ONLN - long j = sysconf(_SC_NPROCESSORS_ONLN); - if (j > 0) - nworker = j > WORKER_MAX ? WORKER_MAX : j; -#endif // _SC_NPROCESSORS_ONLN - // make warn/warnx/err multi-process friendly: if (my_setlinebuf(stderr)) err(EXIT_FAILURE, "setlinebuf(stderr)"); @@ -992,6 +1036,8 @@ int main(int argc, char *argv[]) DELSET(SIGXCPU); DELSET(SIGXFSZ); #undef DELSET + CHECK(int, 0, sigdelset(&workerset, SIGUSR1)); + CHECK(int, 0, sigdelset(&fullset, SIGALRM)); if (nworker == 0) { // no SIGTERM handling w/o workers recv_loop(); @@ -1012,10 +1058,12 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&pset, SIGCHLD)); CHECK(int, 0, sigdelset(&pset, SIGTTIN)); CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + CHECK(int, 0, sigdelset(&pset, SIGUSR1)); struct sigaction sa = {}; sa.sa_handler = sigp; + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); sa.sa_flags = SA_NOCLDSTOP; @@ -1040,6 +1088,7 @@ int main(int argc, char *argv[]) case '.': break; // do_sigchld already called case '-': do_sigttou(); break; case '+': do_sigttin(); break; + case '#': parent_reopen_logs(); break; default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); } } diff --git a/lib/PublicInbox/xh_mset.h b/lib/PublicInbox/xh_mset.h index 4e97a284..db2692c9 100644 --- a/lib/PublicInbox/xh_mset.h +++ b/lib/PublicInbox/xh_mset.h @@ -3,49 +3,6 @@ // This file is only intended to be included by xap_helper.h // it implements pieces used by WWW, IMAP and lei -static void emit_doc_term(FILE *fp, const char *pfx, Xapian::Document *doc) -{ - Xapian::TermIterator cur = doc->termlist_begin(); - Xapian::TermIterator end = doc->termlist_end(); - size_t pfx_len = strlen(pfx); - - for (cur.skip_to(pfx); cur != end; cur++) { - std::string tn = *cur; - if (!starts_with(&tn, pfx, pfx_len)) break; - fputc(0, fp); - fwrite(tn.data(), tn.size(), 1, fp); - } -} - -static enum exc_iter mset_iter(const struct req *req, FILE *fp, off_t off, - Xapian::MSetIterator *i) -{ - try { - fprintf(fp, "%llu", (unsigned long long)(*(*i))); // get_docid - if (req->emit_percent) - fprintf(fp, "%c%d", 0, i->get_percent()); - if (req->pfxc || req->emit_docdata) { - Xapian::Document doc = i->get_document(); - for (int p = 0; p < req->pfxc; p++) - emit_doc_term(fp, req->pfxv[p], &doc); - if (req->emit_docdata) { - std::string d = doc.get_data(); - fputc(0, fp); - fwrite(d.data(), d.size(), 1, fp); - } - } - fputc('\n', fp); - } catch (const Xapian::DatabaseModifiedError & e) { - req->srch->db->reopen(); - if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko"); - return ITER_RETRY; - } catch (const Xapian::DocNotFoundError & e) { // oh well... - warnx("doc not found: %s", e.get_description().c_str()); - if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko"); - } - return ITER_OK; -} - #ifndef WBUF_FLUSH_THRESHOLD # define WBUF_FLUSH_THRESHOLD (BUFSIZ - 1000) #endif @@ -63,7 +20,9 @@ static bool cmd_mset(struct req *req) Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) : mail_mset(req, qry_str); fbuf_init(&wbuf); - fprintf(wbuf.fp, "mset.size=%llu\n", (unsigned long long)mset.size()); + fprintf(wbuf.fp, "mset.size=%llu .get_matches_estimated=%llu\n", + (unsigned long long)mset.size(), + (unsigned long long)mset.get_matches_estimated()); int fd = fileno(req->fp[0]); for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { off_t off = ftello(wbuf.fp); @@ -82,12 +41,10 @@ static bool cmd_mset(struct req *req) if (fseeko(wbuf.fp, 0, SEEK_SET)) EABORT("fseeko"); off = 0; } - for (int t = 10; t > 0; --t) - switch (mset_iter(req, wbuf.fp, off, &i)) { - case ITER_OK: t = 0; break; // leave inner loop - case ITER_RETRY: break; // continue for-loop - case ITER_ABORT: return false; // error - } + fprintf(wbuf.fp, "%llu" "%c" "%d" "%c" "%llu\n", + (unsigned long long)(*i), // get_docid + 0, i.get_percent(), + 0, (unsigned long long)i.get_rank()); } off_t off = ftello(wbuf.fp); if (off < 0) EABORT("ftello"); diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex index bee824b1..2e5a5d2c 100755 --- a/script/public-inbox-extindex +++ b/script/public-inbox-extindex @@ -32,7 +32,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i indexlevel|index-level|L=s max_size|max-size=s batch_size|batch-size=s dedupe:s@ gc commit-interval=i watch scan! dry-run|n - all C=s@ help|h)) + multi-pack-index! all C=s@ help|h)) or die $help; if ($opt->{help}) { print $help; exit 0 }; die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0; diff --git a/script/public-inbox-index b/script/public-inbox-index index 74232ebf..a13e44bf 100755 --- a/script/public-inbox-index +++ b/script/public-inbox-index @@ -44,6 +44,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune batch_size|batch-size=s since|after=s until|before=s sequential-shard|seq-shard + multi-pack-index! no-update-extindex update-extindex|E=s@ fast-noop|F skip-docdata all C=s@ help|h)) or die $help; @@ -6,7 +6,7 @@ use PublicInbox::TestCommon; use Cwd qw(getcwd); use List::Util qw(sum); use autodie qw(close mkdir open rename); -require_mods(qw(json Xapian +SCM_RIGHTS)); +require_mods(qw(json Xapian +SCM_RIGHTS DBD::SQLite)); use_ok 'PublicInbox::CodeSearchIdx'; use PublicInbox::Import; my ($tmp, $for_destroy) = tmpdir(); @@ -147,26 +147,28 @@ if ('multi-repo search') { my $test_xhc = sub { my ($xhc) = @_; + my $csrch = PublicInbox::CodeSearch->new("$tmp/ext"); my $impl = $xhc->{impl}; my ($r, @l); - $r = $xhc->mkreq([], qw(mset -D -c -g), $zp_git, @xh_args, 'NUL'); + $r = $xhc->mkreq([], qw(mset -c -g), $zp_git, @xh_args, 'NUL'); chomp(@l = <$r>); - is(shift(@l), 'mset.size=2', "got expected header $impl"); + like shift(@l), qr/\bmset\.size=2\b/, "got expected header $impl"; my %docid2data; my @got = sort map { - my @f = split /\0/; - is scalar(@f), 2, 'got 2 entries'; - $docid2data{$f[0]} = $f[1]; - $f[1]; + my ($docid, $pct, $rank, @extra) = split /\0/; + ok $pct >= 0 && $pct <= 100, 'pct in range'; + ok $rank >= 0 && $rank <= 100000, 'rank ok'; + is scalar(@extra), 0, 'no extra fields'; + $docid2data{$docid} = + $csrch->xdb->get_document($docid)->get_data; } @l; is_deeply(\@got, $exp, "expected doc_data $impl"); $r = $xhc->mkreq([], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL'); chomp(@l = <$r>); - is(shift(@l), 'mset.size=0', "got miss in wrong dir $impl"); + like shift(@l), qr/\bmset.size=0\b/, "got miss in wrong dir $impl"; is_deeply(\@l, [], "no extra lines $impl"); - my $csrch = PublicInbox::CodeSearch->new("$tmp/ext"); while (my ($did, $expect) = each %docid2data) { is_deeply($csrch->xdb->get_document($did)->get_data, $expect, "docid=$did data matches"); @@ -179,14 +181,15 @@ SKIP: { require_mods('+SCM_RIGHTS', 1); require PublicInbox::XapClient; my $xhc = PublicInbox::XapClient::start_helper('-j0'); - $test_xhc->($xhc); + my $csrch = PublicInbox::CodeSearch->new("$tmp/ext"); + $test_xhc->($xhc, $csrch); skip 'PI_NO_CXX set', 1 if $ENV{PI_NO_CXX}; $xhc->{impl} =~ /Cxx/ or skip 'C++ compiler or xapian development libs missing', 1; skip 'TEST_XH_CXX_ONLY set', 1 if $ENV{TEST_XH_CXX_ONLY}; local $ENV{PI_NO_CXX} = 1; # force XS or SWIG binding test $xhc = PublicInbox::XapClient::start_helper('-j0'); - $test_xhc->($xhc); + $test_xhc->($xhc, $csrch); } if ('--update') { diff --git a/t/extsearch.t b/t/extsearch.t index 090f6db5..797aa8f5 100644 --- a/t/extsearch.t +++ b/t/extsearch.t @@ -559,6 +559,15 @@ EOM for (@xdb) { ok(!$_->get_metadata('indexlevel'), 'no indexlevel in >0 shard') } + my $mpi = "$d/ALL.git/objects/pack/multi-pack-index"; + SKIP: { + skip 'git too old for for multi-pack-index', 2 if !-f $mpi; + unlink glob("$d/ALL.git/objects/pack/*"); + ok run_script([qw(-extindex --all -L medium -j3 + --no-multi-pack-index), $d]), + 'test --no-multi-pack-index'; + ok !-f $mpi, '--no-multi-pack-index respected'; + } } test_lei(sub { diff --git a/t/imap_searchqp.t b/t/imap_searchqp.t index ff1b4535..d7840dd0 100644 --- a/t/imap_searchqp.t +++ b/t/imap_searchqp.t @@ -3,6 +3,8 @@ # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> use strict; use v5.10.1; +use autodie qw(open seek read); +use Fcntl qw(SEEK_SET); use Time::Local qw(timegm); use PublicInbox::TestCommon; require_mods(qw(-imapd)); @@ -29,12 +31,15 @@ is($q->{xap}, 'f:"b"', 'charset handled'); $q = $parse->(qq{CHARSET WTF-8 From b}); like($q, qr/\ANO \[/, 'bad charset rejected'); -for my $x ('', ' (try #2)') { - open my $fh, '>:scalar', \(my $buf = '') or die; - local *STDERR = $fh; +{ + open my $tmperr, '+>', undef; + open my $olderr, '>&', \*STDERR; + open STDERR, '>&', $tmperr; $q = $parse->(qq{CHARSET}); - last if is($buf, '', "nothing spewed to STDERR on bad query$x"); - diag 'FIXME: above fails mysteriously sometimes, so we try again...'; + open STDERR, '>&', $olderr; + seek $tmperr, 0, SEEK_SET; + read($tmperr, my $buf, -s $tmperr); + is($buf, '', 'nothing spewed to STDERR on bad query'); } like($q, qr/\ABAD /, 'bad charset rejected'); diff --git a/t/psgi_v2.t b/t/psgi_v2.t index d5c328f0..2b678fd8 100644 --- a/t/psgi_v2.t +++ b/t/psgi_v2.t @@ -9,6 +9,7 @@ require_git(2.6); use PublicInbox::Eml; use PublicInbox::Config; use PublicInbox::MID qw(mids); +use autodie qw(kill rename); require_mods(qw(DBD::SQLite Xapian HTTP::Request::Common Plack::Test URI::Escape Plack::Builder HTTP::Date)); use_ok($_) for (qw(HTTP::Request::Common Plack::Test)); @@ -394,4 +395,57 @@ my $client3 = sub { test_psgi(sub { $www->call(@_) }, $client3); test_httpd($env, $client3, 4); +if ($^O eq 'linux' && -r "/proc/$$/stat") { + my $args; + my $search_xh_pid = sub { + my ($pid) = @_; + for my $f (glob('/proc/*/stat')) { + open my $fh, '<', $f or next; + my @s = split /\s+/, readline($fh) // next; + next if $s[3] ne $pid; # look for matching PPID + open $fh, '<', "/proc/$s[0]/cmdline" or next; + my $cmdline = readline($fh) // next; + if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ || + $cmdline =~ m!/xap_helper\0!) { + return $s[0]; + } + } + undef; + }; + my $usr1_test = sub { + my ($cb) = @_; + my $td = $PublicInbox::TestCommon::CURRENT_DAEMON; + my $pid = $td->{pid}; + my $res = $cb->(GET('/v2test/?q=m:a-mid@b')); + is $res->code, 200, '-httpd is running w/ search'; + + $search_xh_pid->($pid); + my $xh_pid = $search_xh_pid->($pid) or + BAIL_OUT "can't find XH pid with $args"; + my $xh_err = readlink "/proc/$xh_pid/fd/2"; + is $xh_err, "$env->{TMPDIR}/stderr.log", + "initial stderr expected ($args)"; + rename "$env->{TMPDIR}/stderr.log", + "$env->{TMPDIR}/stderr.old"; + $xh_err = readlink "/proc/$xh_pid/fd/2"; + is $xh_err, "$env->{TMPDIR}/stderr.old", + "stderr followed rename ($args)"; + kill 'USR1', $pid; + tick; + $res = $cb->(GET('/v2test/?q=m:a-mid@b')); + is $res->code, 200, '-httpd still running w/ search'; + my $new_xh_pid = $search_xh_pid->($pid) or + BAIL_OUT "can't find new XH pid with $args"; + is $new_xh_pid, $xh_pid, "XH pid unchanged ($args)"; + $xh_err = readlink "/proc/$new_xh_pid/fd/2"; + is $xh_err, "$env->{TMPDIR}/stderr.log", + "stderr updated ($args)"; + }; + for my $x ('-X0', '-X1', '-X0 -W1', '-X1 -W1') { + $args = $x; + local $ENV{TEST_DAEMON_XH} = $args; + test_httpd($env, $usr1_test); + } +} + done_testing; diff --git a/t/xap_helper.t b/t/xap_helper.t index 0f474608..78be8539 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -9,6 +9,7 @@ use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM); require PublicInbox::AutoReap; use PublicInbox::IPC; require PublicInbox::XapClient; +use PublicInbox::DS qw(now); use autodie; my ($tmp, $for_destroy) = tmpdir(); @@ -204,44 +205,35 @@ for my $n (@NO_CXX) { $err = do { local $/; <$err_r> }; is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})"; - $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args, + $r = $xhc->mkreq([], qw(mset), @ibx_shard_args, 'dfn:lib/PublicInbox/Search.pm'); chomp((my $hdr, @res) = readline($r)); - is $hdr, 'mset.size=1', "got expected header via mset ($xhc->{impl}"; + like $hdr, qr/\bmset\.size=1\b/, + "got expected header via mset ($xhc->{impl}"; is scalar(@res), 1, 'got one result'; @res = split /\0/, $res[0]; { my $doc = $v2->search->xdb->get_document($res[0]); + ok $doc, 'valid document retrieved'; my @q = PublicInbox::Search::xap_terms('Q', $doc); is_deeply \@q, [ $mid ], 'docid usable'; } ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100'; - is $res[2], 'XDFID'.$dfid, 'XDFID result matches'; - is $res[3], 'Q'.$mid, 'Q (msgid) mset result matches'; - is scalar(@res), 4, 'only 4 columns in result'; + is scalar(@res), 3, 'only 3 columns in result'; - $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args, + $r = $xhc->mkreq([], qw(mset), @ibx_shard_args, 'dt:19700101'.'000000..'); chomp(($hdr, @res) = readline($r)); - is $hdr, 'mset.size=6', + like $hdr, qr/\bmset\.size=6\b/, "got expected header via multi-result mset ($xhc->{impl}"; is(scalar(@res), 6, 'got 6 rows'); for my $r (@res) { - my ($docid, $pct, @rest) = split /\0/, $r; + my ($docid, $pct, $rank, @rest) = split /\0/, $r; my $doc = $v2->search->xdb->get_document($docid); ok $pct > 0 && $pct <= 100, "pct > 0 && <= 100 #$docid ($xhc->{impl})"; - my %terms; - for (@rest) { - s/\A([A-Z]+)// or xbail 'no prefix=', \@rest; - push @{$terms{$1}}, $_; - } - while (my ($pfx, $vals) = each %terms) { - @$vals = sort @$vals; - my @q = PublicInbox::Search::xap_terms($pfx, $doc); - is_deeply $vals, \@q, - "#$docid $pfx as expected ($xhc->{impl})"; - } + like $rank, qr/\A\d+\z/, 'rank is a digit'; + is scalar(@rest), 0, 'no extra rows returned'; } my $nr; for my $i (7, 8, 39, 40) { @@ -276,6 +268,20 @@ for my $n (@NO_CXX) { my @oids = (join('', @res) =~ /^([a-f0-9]{7}) /gms); is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or diag explain(\@res, \@err); + + if ($ENV{TEST_XH_TIMEOUT}) { + diag 'testing timeouts...'; + for my $j (qw(0 1)) { + my $t0 = now; + $r = $xhc->mkreq(undef, qw(test_sleep -K 1 -d), + $ibx_idx[0]); + is readline($r), undef, 'got EOF'; + my $diff = now - $t0; + ok $diff < 3, "timeout didn't take too long -j$j"; + ok $diff >= 0.9, "timeout didn't fire prematurely -j$j"; + $xhc = PublicInbox::XapClient::start_helper('-j1'); + } + } } done_testing; |