diff options
Diffstat (limited to 'lib')
78 files changed, 1768 insertions, 769 deletions
diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm index a1b1fc07..bb5d3653 100644 --- a/lib/PublicInbox/Admin.pm +++ b/lib/PublicInbox/Admin.pm @@ -11,6 +11,7 @@ use PublicInbox::Config; use PublicInbox::Inbox; use PublicInbox::Spawn qw(run_qx); use PublicInbox::Eml; +use PublicInbox::Git qw(git_exe); *rel2abs_collapsed = \&PublicInbox::Config::rel2abs_collapsed; sub setup_signals { @@ -77,7 +78,7 @@ sub resolve_git_dir { my $env; defined($pwd) && substr($cd // '/', 0, 1) ne '/' and $env->{PWD} = "$pwd/$cd"; - my $cmd = [ qw(git rev-parse --git-dir) ]; + my $cmd = [ git_exe, qw(rev-parse --git-dir) ]; my $dir = run_qx($cmd, $env, { -C => $cd }); die "error in @$cmd (cwd:${\($cd // '.')}): $?\n" if $?; chomp $dir; @@ -317,7 +318,7 @@ sub progress_prepare ($;$) { $opt->{1} = $null; # suitable for spawn() redirect } else { $opt->{verbose} ||= 1; - $dst //= *STDERR{GLOB}; + $dst //= \*STDERR; $opt->{-progress} = sub { print $dst '# ', @_ }; } } diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index 2f102ec6..fc77bd03 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -11,8 +11,8 @@ use Socket qw(SOL_SOCKET SCM_RIGHTS); sub sendmsg_retry ($) { return 1 if $!{EINTR}; return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS}); - return if ++$_[0] >= 50; - warn "# sleeping on sendmsg: $! (#$_[0])\n"; + return if --$_[0] < 0; + warn "# sleeping on sendmsg: $! ($_[0] tries left)\n"; select(undef, undef, undef, 0.1); 1; } @@ -22,15 +22,15 @@ require Socket::MsgHdr; # XS no warnings 'once'; # any number of FDs per-sendmsg(2) + buffer -*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_; - my ($sock, $fds, undef, $flags) = @_; +*send_cmd4 = sub ($$$$;$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags, $tries) = @_; + $tries //= 50; my $mh = Socket::MsgHdr->new(buf => $_[2]); $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('i' x scalar(@$fds), @$fds)); my $s; - my $try = 0; do { $s = Socket::MsgHdr::sendmsg($sock, $mh, $flags); - } while (!defined($s) && sendmsg_retry($try)); + } while (!defined($s) && sendmsg_retry($tries)); $s; }; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 570ff64f..6d777bf6 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -368,7 +368,7 @@ sub repo_stored { $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{shard_done} = [ $self, $repo_ctx, - PublicInbox::OnDestroy->new(\&next_repos, $repo_ctx, $drs)]; + on_destroy(\&next_repos, $repo_ctx, $drs)]; # shard_done fires when all shards are committed my @active = keys %{$repo_ctx->{active}}; $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; @@ -425,7 +425,7 @@ sub fp_start ($$) { open my $refs, '+>', undef; $git->{-repo}->{refs} = $refs; my ($c, $p) = PublicInbox::PktOp->pair; - my $next_on_err = PublicInbox::OnDestroy->new(\&index_next, $self); + my $next_on_err = on_destroy \&index_next, $self; $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ]; $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async', [ $p->{op_p}, $refs ], $git->{git_dir}) @@ -664,8 +664,7 @@ sub index_repo { my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; delete $git->{-cidx_gits_fini}; # may fire gits_fini my $drs = delete $git->{-cidx_dump_roots_start}; - my $index_done = PublicInbox::OnDestroy->new(\&index_done, - $repo_ctx, $drs); + my $index_done = on_destroy \&index_done, $repo_ctx, $drs; my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ]; for my $n (0..$#shard_in) { @@ -690,7 +689,7 @@ sub ct_fini { # run_git cb sub prep_repo ($$) { my ($self, $git) = @_; return if $DO_QUIT; - my $index_repo = PublicInbox::OnDestroy->new(\&index_repo, $self, $git); + my $index_repo = on_destroy \&index_repo, $self, $git; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); open my $roots_fh, '+>', undef; @@ -787,7 +786,7 @@ sub scan_git_dirs ($) { my ($self) = @_; @$SCANQ = () unless $self->{-opt}->{scan}; $GITS_NR = @$SCANQ or return; - my $gits_fini = PublicInbox::OnDestroy->new(\&gits_fini); + my $gits_fini = on_destroy \&gits_fini; $_->{-cidx_gits_fini} = $gits_fini for @$SCANQ; if (my $drs = $TODO{dump_roots_start}) { $_->{-cidx_dump_roots_start} = $drs for @$SCANQ; @@ -859,7 +858,7 @@ sub prep_umask ($) { umask == $um or progress($self, 'using umask from ', $self->{cidx_dir}, ': ', sprintf('0%03o', $um)); - PublicInbox::OnDestroy->new(\&CORE::umask, umask($um)); + on_destroy \&CORE::umask, umask($um); } else { $self->{umask} = umask; # for SearchIdx->with_umask undef; @@ -1083,12 +1082,12 @@ EOM ($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS ($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS $JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times - $TODO{do_join} = PublicInbox::OnDestroy->new(\&do_join, $self); + $TODO{do_join} = on_destroy \&do_join, $self; $TODO{joining} = 1; # keep shards_active() happy - $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new(\&dump_ibx_start, - $self, $TODO{do_join}); - $TODO{dump_roots_start} = PublicInbox::OnDestroy->new( - \&dump_roots_start, $self, $TODO{do_join}); + $TODO{dump_ibx_start} = on_destroy \&dump_ibx_start, + $self, $TODO{do_join}; + $TODO{dump_roots_start} = on_destroy \&dump_roots_start, + $self, $TODO{do_join}; progress($self, "will join in $QRY_STR date range..."); my $id = -1; @IBXQ = map { ++$id } @IBX; @@ -1110,8 +1109,7 @@ sub init_prune ($) { require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed, comm => \@COMM, awk => \@AWK); for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } - my $run_prune = PublicInbox::OnDestroy->new(\&run_prune, $self, - $TODO{dump_roots_start}); + my $run_prune = on_destroy \&run_prune, $self, $TODO{dump_roots_start}; my ($sort_opt, $sed_opt, $delve_opt); pipe(local $sed_opt->{0}, local $delve_opt->{1}); pipe(local $sort_opt->{0}, local $sed_opt->{1}); @@ -1279,8 +1277,7 @@ sub cidx_run { # main entry point my $restore_umask = prep_umask($self); local $SIGSET = PublicInbox::DS::block_signals( POSIX::SIGTSTP, POSIX::SIGCONT); - my $restore = PublicInbox::OnDestroy->new($$, - \&PublicInbox::DS::sig_setmask, $SIGSET); + my $restore = on_destroy \&PublicInbox::DS::sig_setmask, $SIGSET; local $PRUNE_DONE = []; local $IDXQ = []; local $SCANQ = []; diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm index d6300610..998fc25e 100644 --- a/lib/PublicInbox/Config.pm +++ b/lib/PublicInbox/Config.pm @@ -13,6 +13,7 @@ use v5.10.1; use parent qw(Exporter); our @EXPORT_OK = qw(glob2re rel2abs_collapsed); use PublicInbox::Inbox; +use PublicInbox::Git qw(git_exe); use PublicInbox::Spawn qw(popen_rd run_qx); our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup our $DEDUPE; # set to {} to dedupe or clear cache @@ -188,7 +189,7 @@ sub git_config_dump { unshift(@opt_c, '-c', "include.path=$file") if defined($file); tmp_cmd_opt(\%env, $opt); } - my @cmd = ('git', @opt_c, qw(config -z -l --includes)); + my @cmd = (git_exe, @opt_c, qw(config -z -l --includes)); push(@cmd, '-f', $file) if !@opt_c && defined($file); my $fh = popen_rd(\@cmd, \%env, $opt); my $rv = config_fh_parse($fh, "\0", "\n"); @@ -516,7 +517,9 @@ sub _fill_ibx { delete $ibx->{newsgroup}; warn "newsgroup name invalid: `$ngname'\n"; } else { - my $lc = $ibx->{newsgroup} = lc $ngname; + %dedupe = (lc($ngname) => undef); + my ($lc) = keys %dedupe; + $ibx->{newsgroup} = $lc; warn <<EOM if $lc ne $ngname; W: newsgroup=`$ngname' lowercased to `$lc' EOM @@ -608,7 +611,7 @@ sub config_cmd { my ($self, $env, $opt) = @_; my $f = $self->{-f} // default_file(); my @opt_c = @{$self->{-opt_c} // []}; - my @cmd = ('git', @opt_c, 'config'); + my @cmd = (git_exe, @opt_c, 'config'); @opt_c ? tmp_cmd_opt($env, $opt) : push(@cmd, '-f', $f); \@cmd; } @@ -629,7 +632,7 @@ sub urlmatch { } elsif (($? >> 8) != 1) { $urlmatch_broken = 1; } elsif ($try_git) { # n.b. this takes cwd into account - $val = run_qx([qw(git config), @bool, + $val = run_qx([$cmd->[0], 'config', @bool, qw(-z --get-urlmatch), $key, $url]); undef $val if $?; } diff --git a/lib/PublicInbox/ConfigIter.pm b/lib/PublicInbox/ConfigIter.pm index 14fcef83..f9e3451a 100644 --- a/lib/PublicInbox/ConfigIter.pm +++ b/lib/PublicInbox/ConfigIter.pm @@ -1,12 +1,11 @@ -# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # Intended for PublicInbox::DS::event_loop in read-only daemons # to avoid each_inbox() monopolizing the event loop when hundreds/thousands # of inboxes are in play. package PublicInbox::ConfigIter; -use strict; -use v5.10.1; +use v5.12; sub new { my ($class, $pi_cfg, $cb, @args) = @_; @@ -25,7 +24,7 @@ sub event_step { PublicInbox::DS::requeue($self) if defined($section); } -# for generic PSGI servers +# for generic PSGI servers, but also ManifestJsGz w/ ALL extindex sub each_section { my $self = shift; my ($pi_cfg, $i, $cb, @arg) = @$self; diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 8bc8cfb7..a6fec954 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,9 +32,9 @@ use PublicInbox::Syscall qw(%SIGNUM EPOLLIN EPOLLOUT EPOLLONESHOT EPOLLEXCLUSIVE); use PublicInbox::Tmpfile; use PublicInbox::Select; +use PublicInbox::OnDestroy; use Errno qw(EAGAIN EINVAL ECHILD); use Carp qw(carp croak); -use autodie qw(fork); our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer); my $nextq; # queue for next_tick @@ -679,12 +679,13 @@ sub awaitpid { } } -sub do_fork () { +# for persistent child process +sub fork_persist () { my $seed = rand(0xffffffff); - my $pid = fork; + my $pid = PublicInbox::OnDestroy::fork_tmp; if ($pid == 0) { srand($seed); - eval { Net::SSLeay::randomize() }; + eval { Net::SSLeay::randomize() }; # may not be loaded Reset(); } $pid; diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm index f84c196a..dc6621e4 100644 --- a/lib/PublicInbox/DSKQXS.pm +++ b/lib/PublicInbox/DSKQXS.pm @@ -15,6 +15,7 @@ use v5.12; use Symbol qw(gensym); use IO::KQueue; use Errno qw(EAGAIN); +use PublicInbox::OnDestroy; use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET); sub EV_DISPATCH () { 0x0080 } @@ -37,7 +38,8 @@ sub kq_flag ($$) { sub new { my ($class) = @_; - bless { kq => IO::KQueue->new, owner_pid => $$ }, $class; + my $fgen = $PublicInbox::OnDestroy::fork_gen; + bless { kq => IO::KQueue->new, fgen => $fgen }, $class; } # returns a new instance which behaves like signalfd on Linux. @@ -137,9 +139,8 @@ sub ep_wait { sub DESTROY { my ($self) = @_; my $kq = delete $self->{kq} or return; - if (delete($self->{owner_pid}) == $$) { + delete($self->{fgen}) == $PublicInbox::OnDestroy::fork_gen and POSIX::close($$kq); - } } 1; diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index e578f2e8..28458b19 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -21,9 +21,12 @@ use PublicInbox::Git; use PublicInbox::GitAsyncCat; use PublicInbox::Eml; use PublicInbox::Config; +use PublicInbox::OnDestroy; +use PublicInbox::Search; +use PublicInbox::XapClient; our $SO_ACCEPTFILTER = 0x1000; my @CMD; -my ($set_user, $oldset); +my ($set_user, $oldset, $xh_workers); my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my ($nworker, @listeners, %WORKERS, %logs); my %tls_opt; # scheme://sockname => args for IO::Socket::SSL::SSL_Context->new @@ -169,6 +172,7 @@ options: --cert=FILE default SSL/TLS certificate --key=FILE default SSL/TLS certificate key -W WORKERS number of worker processes to spawn (default: 1) + -X XWORKERS number of Xapian helper processes (default: undefined) See public-inbox-daemon(8) and $prog(1) man pages for more. EOF @@ -184,6 +188,7 @@ EOF 'multi-accept=i' => \$PublicInbox::Listener::MULTI_ACCEPT, 'cert=s' => \$default_cert, 'key=s' => \$default_key, + 'X|xapian-helpers=i' => \$xh_workers, 'help|h' => \(my $show_help), ); GetOptions(%opt) or die $help; @@ -338,22 +343,20 @@ EOF }; if ($daemonize) { - my $pid = fork // die "fork: $!"; + my $pid = PublicInbox::OnDestroy::fork_tmp; exit if $pid; - open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n"; open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n"; open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n"; POSIX::setsid(); - $pid = fork // die "fork: $!"; + $pid = PublicInbox::OnDestroy::fork_tmp; exit if $pid; } return unless defined $pid_file; write_pid($pid_file); - # for ->DESTROY: - bless { pid => $$, pid_file => \$pid_file }, __PACKAGE__; + on_destroy \&unlink_pid_file_safe_ish, \$pid_file; } sub has_busy_clients { # post_loop_do CB @@ -385,10 +388,30 @@ sub worker_quit { # $_[0] = signal name or number (unused) @PublicInbox::DS::post_loop_do = (\&has_busy_clients, { -w => 0 }) } +sub spawn_xh () { + $xh_workers // return; + require PublicInbox::XhcMset; + local $) = $gid if defined $gid; + local $( = $gid if defined $gid; + local $> = $uid if defined $uid; + local $< = $uid if defined $uid; + $PublicInbox::Search::XHC = eval { + local $ENV{STDERR_PATH} = $stderr; + local $ENV{STDOUT_PATH} = $stdout; + PublicInbox::XapClient::start_helper('-j', $xh_workers) + }; + warn "E: $@" if $@; + awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh) + if $PublicInbox::Search::XHC; +} + sub reopen_logs { + my ($sig) = @_; $logs{$stdout} //= \*STDOUT if defined $stdout; $logs{$stderr} //= \*STDERR if defined $stderr; while (my ($p, $fh) = each %logs) { open_log_path($fh, $p) } + ($sig && defined($xh_workers) && $PublicInbox::Search::XHC) and + kill('USR1', $PublicInbox::Search::XHC->{io}->attached_pid); } sub sockname ($) { @@ -476,13 +499,13 @@ sub upgrade { # $_[0] = signal name or number (unused) warn "BUG: .oldbin suffix exists: $pid_file\n"; return; } - unlink_pid_file_safe_ish($$, $pid_file); + unlink_pid_file_safe_ish(\$pid_file); $pid_file .= '.oldbin'; write_pid($pid_file); } - my $pid = fork; + my $pid = eval { PublicInbox::OnDestroy::fork_tmp }; if (!defined($pid)) { - warn "fork failed: $!\n"; + warn "fork failed: $! $@\n"; } elsif ($pid == 0) { $ENV{LISTEN_FDS} = scalar @listeners; $ENV{LISTEN_PID} = $$; @@ -509,23 +532,20 @@ sub upgrade_aborted { my $file = $pid_file; $file =~ s/\.oldbin\z// or die "BUG: no '.oldbin' suffix in $file"; - unlink_pid_file_safe_ish($$, $pid_file); + unlink_pid_file_safe_ish(\$pid_file); $pid_file = $file; eval { write_pid($pid_file) }; warn $@, "\n" if $@; } -sub unlink_pid_file_safe_ish ($$) { - my ($unlink_pid, $file) = @_; - return unless defined $unlink_pid && $unlink_pid == $$; +sub unlink_pid_file_safe_ish ($) { + my ($fref) = @_; - open my $fh, '<', $file or return; + open my $fh, '<', $$fref or return; local $/ = "\n"; defined(my $read_pid = <$fh>) or return; chomp $read_pid; - if ($read_pid == $unlink_pid) { - Net::Server::Daemonize::unlink_pid_file($file); - } + Net::Server::Daemonize::unlink_pid_file($$fref) if $read_pid == $$; } sub master_quit ($) { @@ -545,9 +565,10 @@ sub reap_worker { # awaitpid CB sub start_worker ($) { my ($nr) = @_; return unless @listeners; - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { undef %WORKERS; + undef $xh_workers; local $PublicInbox::DS::Poller; # allow epoll/kqueue $set_user->() if $set_user; PublicInbox::EOFpipe->new($parent_pipe, \&worker_quit); @@ -575,8 +596,9 @@ sub master_loop { pipe($parent_pipe, my $p1) or die "failed to create parent-pipe: $!"; my $set_workers = $nworker; # for SIGWINCH reopen_logs(); + spawn_xh; my $msig = { - USR1 => sub { reopen_logs(); kill_workers($_[0]); }, + USR1 => sub { reopen_logs($_[0]); kill_workers($_[0]); }, USR2 => \&upgrade, QUIT => \&master_quit, INT => \&master_quit, @@ -675,6 +697,7 @@ sub daemon_loop () { sub worker_loop { $uid = $gid = undef; reopen_logs(); + spawn_xh; # only for -W0 @listeners = map {; my $l = sockname($_); my $tls_cb = $POST_ACCEPT{$l}; @@ -691,22 +714,30 @@ sub worker_loop { PublicInbox::DS::event_loop(\%WORKER_SIG, $oldset); } +sub respawn_xh { # awaitpid cb + my ($pid) = @_; + return unless @listeners; + warn "W: xap_helper PID:$pid died: \$?=$?, respawning...\n"; + spawn_xh; +} + sub run { my ($default_listen) = @_; $nworker = 1; local (%XNETD, %POST_ACCEPT); daemon_prepare($default_listen); - my $for_destroy = daemonize(); + my $unlink_on_leave = daemonize(); # localize GCF2C for tests: local $PublicInbox::GitAsyncCat::GCF2C; local $PublicInbox::Git::async_warn = 1; local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); local %WORKER_SIG = %WORKER_SIG; - local %POST_ACCEPT; + local $PublicInbox::XapClient::tries = 0; + local $PublicInbox::Search::XHC if defined($xh_workers); daemon_loop(); - # ->DESTROY runs when $for_destroy goes out-of-scope + # $unlink_on_leave runs } sub write_pid ($) { @@ -715,8 +746,4 @@ sub write_pid ($) { do_chown($path); } -sub DESTROY { - unlink_pid_file_safe_ish($_[0]->{pid}, ${$_[0]->{pid_file}}); -} - 1; diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm index 3474874f..77b699a2 100644 --- a/lib/PublicInbox/EOFpipe.pm +++ b/lib/PublicInbox/EOFpipe.pm @@ -7,8 +7,8 @@ use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ); sub new { - my (undef, $rd, $cb) = @_; - my $self = bless { cb => $cb }, __PACKAGE__; + my (undef, $rd, @cb_args) = @_; + my $self = bless { cb_args => \@cb_args }, __PACKAGE__; # 4096: page size fcntl($rd, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); @@ -17,7 +17,8 @@ sub new { sub event_step { my ($self) = @_; if ($self->do_read(my $buf, 1) == 0) { # auto-closed - $self->{cb}->(); + my ($cb, @args) = @{delete $self->{cb_args}}; + $cb->(@args); } } diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index ebbffffc..934197c0 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -543,13 +543,7 @@ sub _ibx_for ($$$) { sub _fd_constrained ($) { my ($self) = @_; $self->{-fd_constrained} //= do { - my $soft; - if (eval { require BSD::Resource; 1 }) { - my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); - ($soft, undef) = BSD::Resource::getrlimit($NOFILE); - } else { - chomp($soft = `sh -c 'ulimit -n'`); - } + my $soft = PublicInbox::Search::ulimit_n; if (defined($soft)) { # $want is an estimate my $want = scalar(@{$self->{ibx_active}}) + 64; @@ -1287,11 +1281,11 @@ sub idx_init { # similar to V2Writable ($has_new || $prune_nr || $new ne '') and $self->{mg}->write_alternates($mode, $alt, $new); my $restore = $self->with_umask; - if ($git_midx) { - my @cmd = ('multi-pack-index'); - push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1; + if ($git_midx && ($opt->{'multi-pack-index'} // 1)) { + my $cmd = $self->git->cmd('multi-pack-index'); + push @$cmd, '--no-progress' if ($opt->{quiet}//0) > 1; my $lk = $self->lock_for_scope; - system('git', "--git-dir=$ALL", @cmd, 'write'); + system(@$cmd, 'write'); # ignore errors, fairly new command, may not exist } $self->parallel_init($self->{indexlevel}); @@ -1424,5 +1418,6 @@ no warnings 'once'; *idx_shard = \&PublicInbox::V2Writable::idx_shard; *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; *checkpoint = \&PublicInbox::V2Writable::checkpoint; +*barrier = \&PublicInbox::V2Writable::barrier; 1; diff --git a/lib/PublicInbox/Fetch.pm b/lib/PublicInbox/Fetch.pm index b0f1437c..814d6e8e 100644 --- a/lib/PublicInbox/Fetch.pm +++ b/lib/PublicInbox/Fetch.pm @@ -12,6 +12,7 @@ use PublicInbox::LeiCurl; use PublicInbox::LeiMirror; use PublicInbox::SHA qw(sha_all); use File::Temp (); +use PublicInbox::Git qw(git_exe); sub new { bless {}, __PACKAGE__ } @@ -19,7 +20,7 @@ sub remote_url ($$) { my ($lei, $dir) = @_; my $rn = $lei->{opt}->{'try-remote'} // [ 'origin', '_grokmirror' ]; for my $r (@$rn) { - my $cmd = [ qw(git config), "remote.$r.url" ]; + my $cmd = [ git_exe, 'config', "remote.$r.url" ]; my $url = run_qx($cmd, undef, { -C => $dir, 2 => $lei->{2} }); next if $?; $url =~ s!/*\n!!s; @@ -92,7 +93,7 @@ sub do_manifest ($$$) { sub get_fingerprint2 { my ($git_dir) = @_; - my $rd = popen_rd([qw(git show-ref)], undef, { -C => $git_dir }); + my $rd = popen_rd([git_exe, 'show-ref'], undef, { -C => $git_dir }); sha_all(256, $rd)->digest; # ignore show-ref errors } @@ -132,8 +133,8 @@ sub do_fetch { # main entry point warn "W: $edir missing remote.*.url\n"; my $o = { -C => $edir }; $o->{1} = $o->{2} = $lei->{2}; - run_wait([qw(git config -l)], undef, $o) and - $lei->child_error($?); + run_wait([git_exe, qw(config -l)], undef, $o) + and $lei->child_error($?); } } @epochs = grep { !$skip->{$_} } @epochs if $skip; @@ -188,7 +189,7 @@ EOM my $opt = {}; # for spawn if (-d $d) { $fp2->[0] = get_fingerprint2($d) if $fp2; - $cmd = [ @$torsocks, 'git', "--git-dir=$d", + $cmd = [ @$torsocks, git_exe, "--git-dir=$d", PublicInbox::LeiMirror::fetch_args($lei, $opt)]; } else { my $e_uri = $ibx_uri->clone; diff --git a/lib/PublicInbox/Gcf2.pm b/lib/PublicInbox/Gcf2.pm index 78392990..acc2091c 100644 --- a/lib/PublicInbox/Gcf2.pm +++ b/lib/PublicInbox/Gcf2.pm @@ -11,7 +11,7 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use IO::Handle; # autoflush use PublicInbox::Git qw($ck_unlinked_packs); use PublicInbox::Lock; -use autodie qw(close open seek truncate); +use autodie qw(open seek truncate); BEGIN { my (%CFG, $c_src); diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index af12f141..32c11a59 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -10,6 +10,7 @@ package PublicInbox::Git; use strict; use v5.10.1; use parent qw(Exporter PublicInbox::DS); +use PublicInbox::DS qw(now); use autodie qw(socketpair read); use POSIX (); use Socket qw(AF_UNIX SOCK_STREAM); @@ -25,7 +26,7 @@ use PublicInbox::SHA qw(sha_all); our %HEXLEN2SHA = (40 => 1, 64 => 256); our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64); our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN - $ck_unlinked_packs); + $ck_unlinked_packs git_exe); our $in_cleanup; our $async_warn; # true in read-only daemons @@ -54,9 +55,16 @@ my %ESC_GIT = map { $GIT_ESC{$_} => $_ } keys %GIT_ESC; my $EXE_ST = ''; # pack('dd', st_dev, st_ino); # no `q' in some 32-bit builds my ($GIT_EXE, $GIT_VER); -sub check_git_exe () { +sub git_exe () { + my $now = now; + state $next_check = $now - 10; + return $GIT_EXE if $now < $next_check; + $next_check = $now + 10; $GIT_EXE = which('git') // die "git not found in $ENV{PATH}"; - my @st = stat(_) or die "stat($GIT_EXE): $!"; # can't do HiRes w/ _ +} + +sub git_version () { + my @st = stat(git_exe) or die "stat($GIT_EXE): $!"; my $st = pack('dd', $st[0], $st[1]); if ($st ne $EXE_ST) { my $v = run_qx([ $GIT_EXE, '--version' ]); @@ -66,11 +74,6 @@ sub check_git_exe () { $GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)"; $EXE_ST = $st; } - $GIT_EXE; -} - -sub git_version { - check_git_exe(); $GIT_VER; } @@ -103,9 +106,10 @@ sub new { sub git_path ($$) { my ($self, $path) = @_; $self->{-git_path}->{$path} //= do { - my $d = "$self->{git_dir}/$path"; - if (-e $d) { - $d; + my $d = $self->{git_dir}; + my $f = "$d/$path"; + if (-d "$d/objects") { + $f; } else { local $/ = "\n"; my $rdr = { 2 => \my $err }; @@ -114,7 +118,7 @@ sub git_path ($$) { chomp $s; # git prior to 2.5.0 did not understand --git-path - $s eq "--git-path\n$path" ? $d : $s; + $s eq "--git-path\n$path" ? $f : $s; } }; } @@ -174,7 +178,7 @@ sub _sock_cmd { # git 2.31.0+ supports -c core.abbrev=no, don't bother with # core.abbrev=64 since not many releases had SHA-256 prior to 2.31 - my $abbr = $GIT_VER lt v2.31.0 ? 40 : 'no'; + my $abbr = git_version lt v2.31.0 ? 40 : 'no'; my @cmd = ($GIT_EXE, "--git-dir=$gd", '-c', "core.abbrev=$abbr", 'cat-file', "--$batch"); if ($err_c) { @@ -203,21 +207,21 @@ sub cat_async_retry ($$) { $oid = \$oid if !@$new_inflight; # to indicate oid retried push @$new_inflight, $oid, $cb, $arg; } - $sock->close if $sock; # only safe once old_inflight is empty + undef $sock; # gcf_drain may run from PublicInbox::IO::DESTROY cat_async_step($self, $new_inflight); # take one step } sub gcf_inflight ($) { my ($self) = @_; # FIXME: the first {sock} check can succeed but Perl can complain - # about calling ->owner_pid on an undefined value. Not sure why or - # how this happens but t/imapd.t can complain about it, sometimes. + # about an undefined value. Not sure why or how this happens but + # t/imapd.t can complain about it, sometimes. if ($self->{sock}) { - if (eval { $self->{sock}->owner_pid == $$ }) { + if (eval { $self->{sock}->can_reap }) { return $self->{inflight}; } elsif ($@) { no warnings 'uninitialized'; - warn "E: $self sock=$self->{sock}: owner_pid failed: ". + warn "E: $self sock=$self->{sock}: can_reap failed: ". "$@ (continuing...)"; } delete @$self{qw(sock inflight)}; @@ -287,8 +291,7 @@ sub cat_async_wait ($) { sub batch_prepare ($) { my ($self) = @_; - check_git_exe(); - if ($GIT_VER ge BATCH_CMD_VER) { + if (git_version ge BATCH_CMD_VER) { $self->{-bc} = 1; _sock_cmd($self, 'batch-command', 1); } else { @@ -344,8 +347,7 @@ sub ck { sub check_async_begin ($) { my ($self) = @_; cleanup($self) if alternates_changed($self); - check_git_exe(); - if ($GIT_VER ge BATCH_CMD_VER) { + if (git_version ge BATCH_CMD_VER) { $self->{-bc} = 1; _sock_cmd($self, 'batch-command', 1); } else { @@ -421,15 +423,15 @@ sub async_err ($$$$$) { sub cmd { my $self = shift; - [ $GIT_EXE // check_git_exe(), "--git-dir=$self->{git_dir}", @_ ] + [ git_exe(), "--git-dir=$self->{git_dir}", @_ ] } # $git->popen(qw(show f00)); # or # $git->popen(qw(show f00), { GIT_CONFIG => ... }, { 2 => ... }); sub popen { my ($self, $cmd) = splice(@_, 0, 2); - $cmd = [ 'git', "--git-dir=$self->{git_dir}", - ref($cmd) ? @$cmd : ($cmd, grep { defined && !ref } @_) ]; + $cmd = $self->cmd(ref($cmd) ? @$cmd : + ($cmd, grep { defined && !ref } @_)); popen_rd($cmd, grep { !defined || ref } @_); # env and opt } @@ -577,9 +579,8 @@ sub cloneurl { # templates/this--description in git.git sub manifest_entry { my ($self, $epoch, $default_desc) = @_; - check_git_exe(); my $gd = $self->{git_dir}; - my @git = ($GIT_EXE, "--git-dir=$gd"); + my @git = (git_exe, "--git-dir=$gd"); my $sr = popen_rd([@git, 'show-ref']); my $own = popen_rd([@git, qw(config gitweb.owner)]); my $mod = popen_rd([@git, @MODIFIED_DATE]); @@ -637,7 +638,8 @@ sub event_step { my ($self) = @_; my $inflight = gcf_inflight($self); if ($inflight && @$inflight) { - $self->cat_async_step($inflight); + eval { $self->cat_async_step($inflight) }; + warn "E: $self->{git_dir}: $@" if $@; return $self->close unless $self->{sock}; # don't loop here to keep things fair, but we must requeue # if there's already-read data in pi_io_rbuf @@ -662,10 +664,9 @@ sub watch_async ($) { sub close { my ($self) = @_; - my $sock = $self->{sock}; delete @$self{qw(-bc err_c inflight)}; delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); - $sock->close if $sock; # calls gcf_drain via awaitpid + # gcf_drain may run from PublicInbox::IO::DESTROY } package PublicInbox::GitCheck; # only for git <2.36 diff --git a/lib/PublicInbox/GitCredential.pm b/lib/PublicInbox/GitCredential.pm index bb225ff3..cf5a2213 100644 --- a/lib/PublicInbox/GitCredential.pm +++ b/lib/PublicInbox/GitCredential.pm @@ -4,13 +4,14 @@ # git-credential wrapper with built-in .netrc fallback package PublicInbox::GitCredential; use v5.12; +use PublicInbox::Git qw(git_exe); use PublicInbox::Spawn qw(popen_rd); use autodie qw(close pipe); sub run ($$;$) { my ($self, $op, $lei) = @_; my ($in_r, $in_w, $out_r); - my $cmd = [ qw(git credential), $op ]; + my $cmd = [ git_exe, 'credential', $op ]; pipe($in_r, $in_w); if ($lei) { # we'll die if disconnected: pipe($out_r, my $out_w); diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 396aa783..ac610d4b 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -106,7 +106,9 @@ sub serve_smart { $env{PATH_TRANSLATED} = "$git->{git_dir}/$path"; my $rdr = input_prepare($env) or return r(500); $rdr->{quiet} = 1; - my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr); + my $cmd = $git->cmd('http-backend'); + splice @$cmd, 1, 0, '-c', 'safe.directory=*'; + my $qsp = PublicInbox::Qspawn->new($cmd, \%env, $rdr); $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path); } diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm index 8b630f25..a2e82a2d 100644 --- a/lib/PublicInbox/GzipFilter.pm +++ b/lib/PublicInbox/GzipFilter.pm @@ -11,6 +11,8 @@ # async_eml callbacks only run when a blob arrives from git. # # We continue to support getline+close for generic PSGI servers. +# Note: we favor gzip in Perl (as opposed to nginx || varnish) to +# reduce IPC memory traffic package PublicInbox::GzipFilter; use strict; use parent qw(Exporter); @@ -21,7 +23,9 @@ use PublicInbox::GitAsyncCat; use Carp qw(carp); our @EXPORT_OK = qw(gzf_maybe); -my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1); +# Compress::Raw::Zlib uses MAX_MEM_LEVEL (9) while zlib DEF_MEM_LEVEL is 8; +# choose the zlib default because C:R:Z is excessive. +my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1, -MemLevel => 8); my @GZIP_HDRS = qw(Vary Accept-Encoding Content-Encoding gzip); sub new { bless {}, shift } # qspawn filter diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 7162732e..b7728bd2 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -47,7 +47,7 @@ open(my $null_io, '<', '/dev/null') or die "open /dev/null: $!"; { my @n = stat($null_io) or die "stat(/dev/null): $!"; my @i = stat(STDIN) or die "stat(STDIN): $!"; - $null_io = *STDIN{IO} if "@n[0, 1]" eq "@i[0, 1]"; + $null_io = \*STDIN if "@n[0, 1]" eq "@i[0, 1]"; } my $http_date; diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm index 5654f3b0..8640f112 100644 --- a/lib/PublicInbox/IO.pm +++ b/lib/PublicInbox/IO.pm @@ -10,12 +10,13 @@ our @EXPORT_OK = qw(poll_in read_all try_cat write_file); use Carp qw(croak); use IO::Poll qw(POLLIN); use Errno qw(EINTR EAGAIN); +use PublicInbox::OnDestroy; # don't autodie in top-level for Perl 5.16.3 (and maybe newer versions) # we have our own ->close, so we scope autodie into each sub sub waitcb { # awaitpid callback my ($pid, $errref, $cb, @args) = @_; - $$errref = $? if $errref; # sets .cerr for _close + $$errref = $?; # sets .cerr for _close $cb->($pid, @args) if $cb; # may clobber $? } @@ -23,8 +24,9 @@ sub attach_pid { my ($io, $pid, @cb_arg) = @_; bless $io, __PACKAGE__; # we share $err (and not $self) with awaitpid to avoid a ref cycle - ${*$io}{pi_io_reap} = [ $$, $pid, \(my $err) ]; - awaitpid($pid, \&waitcb, \$err, @cb_arg); + my $e = \(my $err); + ${*$io}{pi_io_reap} = [ $PublicInbox::OnDestroy::fork_gen, $pid, $e ]; + awaitpid($pid, \&waitcb, $e, @cb_arg); $io; } @@ -33,9 +35,9 @@ sub attached_pid { ${${*$io}{pi_io_reap} // []}[1]; } -sub owner_pid { +sub can_reap { my ($io) = @_; - ${${*$io}{pi_io_reap} // [-1]}[0]; + ${${*$io}{pi_io_reap} // [-1]}[0] == $PublicInbox::OnDestroy::fork_gen; } # caller cares about error result if they call close explicitly @@ -44,7 +46,7 @@ sub close { my ($io) = @_; my $ret = $io->SUPER::close; my $reap = delete ${*$io}{pi_io_reap}; - return $ret unless $reap && $reap->[0] == $$; + return $ret if ($reap->[0] // -1) != $PublicInbox::OnDestroy::fork_gen; if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously $? = ${$reap->[2]}; } else { # wait synchronously @@ -56,9 +58,9 @@ sub close { sub DESTROY { my ($io) = @_; my $reap = delete ${*$io}{pi_io_reap}; - if ($reap && $reap->[0] == $$) { + if (($reap->[0] // -1) == $PublicInbox::OnDestroy::fork_gen) { $io->SUPER::close; - awaitpid($reap->[1]); + ${$reap->[2]} // awaitpid($reap->[1]); } $io->SUPER::DESTROY; } diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index a5cae6f2..ed6d27fd 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -10,7 +10,7 @@ package PublicInbox::IPC; use v5.12; use parent qw(Exporter); -use autodie qw(close fork pipe read socketpair sysread); +use autodie qw(close pipe read socketpair sysread); use Carp qw(croak); use PublicInbox::DS qw(awaitpid); use PublicInbox::Spawn; @@ -93,6 +93,8 @@ sub ipc_worker_loop ($$$) { } } +sub exit_exception { exit(!!$@) } + # starts a worker if Sereal or Storable is installed sub ipc_worker_spawn { my ($self, $ident, $oldset, $fields, @cb_args) = @_; @@ -102,7 +104,7 @@ sub ipc_worker_spawn { pipe(my $r_res, my $w_res); my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->ipc_atfork_prepare; - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; @@ -110,7 +112,7 @@ sub ipc_worker_spawn { $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; # ensure we properly exit even if warn() dies: - my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); + my $end = on_destroy \&exit_exception; eval { $fields //= {}; local @$self{keys %$fields} = values(%$fields); @@ -330,7 +332,7 @@ sub _wq_worker_start { my ($self, $oldset, $fields, $one, @cb_args) = @_; my ($bcast1, $bcast2); $one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0); - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { undef $bcast1; delete @$self{qw(-wq_s1 -wq_ppid)}; @@ -340,7 +342,7 @@ sub _wq_worker_start { local $0 = $one ? $self->{-wq_ident} : "$self->{-wq_ident} $self->{-wq_worker_nr}"; # ensure we properly exit even if warn() dies: - my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); + my $end = on_destroy \&exit_exception; eval { $fields //= {}; local @$self{keys %$fields} = values(%$fields); diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index ed34d548..2e193d46 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -8,6 +8,7 @@ package PublicInbox::Import; use v5.12; use parent qw(PublicInbox::Lock); +use PublicInbox::Git qw(git_exe); use PublicInbox::Spawn qw(run_die run_qx spawn); use PublicInbox::MID qw(mids mid2path); use PublicInbox::Address; @@ -24,7 +25,7 @@ use PublicInbox::IO qw(read_all); sub default_branch () { state $default_branch = do { - my $h = run_qx([qw(git config --global init.defaultBranch)], + my $h = run_qx([git_exe,qw(config --global init.defaultBranch)], { GIT_CONFIG => undef }); chomp $h; $h eq '' ? 'refs/heads/master' : "refs/heads/$h"; @@ -73,8 +74,8 @@ sub gfi_start { die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?; $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) }; } - my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import - --quiet --done --date-format=raw) ]; + my $gfi = $git->cmd(qw(fast-import + --quiet --done --date-format=raw)); my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 }); $self->{nchg} = 0; $self->{io} = PublicInbox::IO::attach_pid($io, $pid); @@ -161,7 +162,7 @@ sub _update_git_info ($$) { # for compatibility with existing ssoma installations # we can probably remove this entirely by 2020 my $git_dir = $self->{git}->{git_dir}; - my @cmd = ('git', "--git-dir=$git_dir"); + my @cmd = @{$self->{git}->cmd}; my $index = "$git_dir/ssoma.index"; if (-e $index && !$ENV{FAST}) { my $env = { GIT_INDEX_FILE => $index }; @@ -631,7 +632,7 @@ sub replace_oids { chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace; $self->{nchg} = 0; # prevent _update_git_info until update-ref: $self->done; - my @git = ('git', "--git-dir=$git->{git_dir}"); + my @git = @{$git->cmd}; run_die([@git, qw(update-ref), $old, $tmp]) if $nreplace; diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm index 62112171..20808d6d 100644 --- a/lib/PublicInbox/Isearch.pm +++ b/lib/PublicInbox/Isearch.pm @@ -26,34 +26,44 @@ SELECT ibx_id FROM inboxes WHERE eidx_key = ? LIMIT 1 sub query_approxidate { $_[0]->{es}->query_approxidate($_[1], $_[2]) } -sub mset { - my ($self, $str, $opt) = @_; +sub eidx_mset_prep ($$) { + my ($self, $opt) = @_; my %opt = $opt ? %$opt : (); $opt{eidx_key} = $self->{eidx_key}; - if (my $uid_range = $opt{uid_range}) { - my ($beg, $end) = @$uid_range; - my $ibx_id = $self->{-ibx_id} //= _ibx_id($self); - my $dbh = $self->{es}->over->dbh; - my $sth = $dbh->prepare_cached(<<'', undef, 1); + my $uid_range = $opt{uid_range} or return \%opt; + my ($beg, $end) = @$uid_range; + my $ibx_id = $self->{-ibx_id} //= _ibx_id($self); + my $dbh = $self->{es}->over->dbh; + my $sth = $dbh->prepare_cached(<<'', undef, 1); SELECT MIN(docid) FROM xref3 WHERE ibx_id = ? AND xnum >= ? AND xnum <= ? - $sth->execute($ibx_id, $beg, $end); - my @r = ($sth->fetchrow_array); + $sth->execute($ibx_id, $beg, $end); + my @r = ($sth->fetchrow_array); - $sth = $dbh->prepare_cached(<<'', undef, 1); + $sth = $dbh->prepare_cached(<<'', undef, 1); SELECT MAX(docid) FROM xref3 WHERE ibx_id = ? AND xnum >= ? AND xnum <= ? - $sth->execute($ibx_id, $beg, $end); - $r[1] = $sth->fetchrow_array; - if (defined($r[1]) && defined($r[0])) { - $opt{limit} = $r[1] - $r[0] + 1; - } else { - $r[1] //= $self->{es}->xdb->get_lastdocid; - $r[0] //= 0; - } - $opt{uid_range} = \@r; # these are fed to Xapian and SQLite + $sth->execute($ibx_id, $beg, $end); + $r[1] = $sth->fetchrow_array; + if (defined($r[1]) && defined($r[0])) { + $opt{limit} = $r[1] - $r[0] + 1; + } else { + $r[1] //= $self->{es}->xdb->get_lastdocid; + $r[0] //= 0; } - $self->{es}->mset($str, \%opt); + $opt{uid_range} = \@r; # these are fed to Xapian and SQLite + \%opt; +} + +sub mset { + my ($self, $str, $opt) = @_; + $self->{es}->mset($str, eidx_mset_prep $self, $opt); +} + +sub async_mset { + my ($self, $str, $opt, $cb, @args) = @_; + $opt = eidx_mset_prep $self, $opt; + $self->{es}->async_mset($str, $opt, $cb, @args); } sub mset_to_artnums { diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 81f940fe..c5146428 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -9,7 +9,7 @@ package PublicInbox::LEI; use v5.12; use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); -use autodie qw(bind chdir fork open pipe socket socketpair syswrite unlink); +use autodie qw(bind chdir open pipe socket socketpair syswrite unlink); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); @@ -22,8 +22,10 @@ use PublicInbox::Syscall qw(EPOLLIN); use PublicInbox::Spawn qw(run_wait popen_rd run_qx); use PublicInbox::Lock; use PublicInbox::Eml; +use PublicInbox::Git qw(git_exe); use PublicInbox::Import; use PublicInbox::ContentHash qw(git_sha); +use PublicInbox::OnDestroy; use PublicInbox::IPC; use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path (); @@ -176,6 +178,7 @@ our %CMD = ( # sorted in order of importance/use: 'stdin|', # /|\z/ must be first for lone dash @lxs_opt, @net_opt, qw(save! output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + thread-id|T=s sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ shared color! mail-sync!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], @@ -504,12 +507,12 @@ sub x_it ($$) { sub err ($;@) { my $self = shift; - my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + my $err = $self->{2} // ($self->{pgr} // [])->[2] // \*STDERR; my @eor = (substr($_[-1]//'', -1, 1) eq "\n" ? () : ("\n")); print $err @_, @eor and return; my $old_err = delete $self->{2}; $old_err->close if $! == EPIPE && $old_err; - $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + $err = $self->{2} = ($self->{pgr} // [])->[2] // \*STDERR; print $err @_, @eor or print STDERR @_, @eor; } @@ -631,9 +634,8 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die sub pkt_op_pair { my ($self) = @_; - require PublicInbox::OnDestroy; require PublicInbox::PktOp; - my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self); + my $end = on_destroy \&_delete_pkt_op, $self; @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair; $end; } @@ -727,8 +729,6 @@ sub optparse ($$$) { require PublicInbox::LeiInput; my @err = PublicInbox::LeiInput::vmd_mod_extract($self, $argv); return $self->fail(join("\n", @err)) if @err; - } else { - warn "proto $proto\n" if $cmd =~ /(add-watch|tag|index)/; } my $i = 0; @@ -1099,7 +1099,7 @@ sub path_to_fd { # caller needs to "-t $self->{1}" to check if tty sub start_pager { my ($self, $new_env) = @_; - chomp(my $pager = run_qx([qw(git var GIT_PAGER)])); + chomp(my $pager = run_qx([git_exe, qw(var GIT_PAGER)])); warn "`git var PAGER' error: \$?=$?" if $?; return if $pager eq 'cat' || $pager eq ''; $new_env //= {}; @@ -1357,7 +1357,7 @@ sub lazy_start { STDIN->autoflush(1); dump_and_clear_log(); POSIX::setsid() > 0 or die "setsid: $!"; - my $pid = fork; + my $pid = PublicInbox::OnDestroy::fork_tmp; return if $pid; $0 = "lei-daemon $path"; local (%PATH2CFG, $MDIR2CFGPATH); @@ -1444,7 +1444,7 @@ sub wq_eof { # EOF callback for main daemon my ($lei, $wq_fld) = @_; local $current_lei = $lei; my $wq = delete $lei->{$wq_fld // 'wq1'}; - $lei->sto_done_request($wq); + $lei->sto_barrier_request($wq); $wq // $lei->fail; # already failed } @@ -1549,7 +1549,7 @@ sub lms { (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef; } -sub sto_done_request { +sub sto_barrier_request { my ($lei, $wq) = @_; return unless $lei->{sto} && $lei->{sto}->{-wq_s1}; local $current_lei = $lei; @@ -1557,9 +1557,9 @@ sub sto_done_request { eval { $lei->{sto}->wq_do('schedule_commit', $n) }; } else { my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock}; - my $errfh = $lei->{2} // *STDERR{GLOB}; + my $errfh = $lei->{2} // \*STDERR; my @io = $s ? ($errfh, $s) : ($errfh); - eval { $lei->{sto}->wq_io_do('done', \@io) }; + eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) }; } warn($@) if $@; } diff --git a/lib/PublicInbox/LeiALE.pm b/lib/PublicInbox/LeiALE.pm index 528de22c..ce03f5b4 100644 --- a/lib/PublicInbox/LeiALE.pm +++ b/lib/PublicInbox/LeiALE.pm @@ -11,6 +11,7 @@ use parent qw(PublicInbox::LeiSearch PublicInbox::Lock); use PublicInbox::Git; use autodie qw(close open rename seek truncate); use PublicInbox::Import; +use PublicInbox::OnDestroy; use PublicInbox::LeiXSearch; use Fcntl qw(SEEK_SET); @@ -41,11 +42,11 @@ sub over {} # undef for xoids_for sub overs_all { # for xoids_for (called only in lei workers?) my ($self) = @_; - my $pid = $$; - if (($self->{owner_pid} // $pid) != $pid) { + my $fgen = $PublicInbox::OnDestroy::fork_gen ; + if (($self->{fgen} // $fgen) != $fgen) { delete($_->{over}) for @{$self->{ibxish}}; } - $self->{owner_pid} = $pid; + $self->{fgen} = $fgen; grep(defined, map { $_->over } @{$self->{ibxish}}); } diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 127cc81e..31936c36 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -10,14 +10,14 @@ use parent qw(PublicInbox::IPC); use PublicInbox::Spawn qw(run_wait run_qx which); use PublicInbox::DS; use PublicInbox::Eml; -use PublicInbox::Git; +use PublicInbox::Git qw(git_exe); use PublicInbox::IO qw(read_all); sub get_git_dir ($$) { my ($lei, $d) = @_; return $d if -d "$d/objects" && -d "$d/refs" && -e "$d/HEAD"; - my $cmd = [ qw(git rev-parse --git-dir) ]; + my $cmd = [ git_exe, qw(rev-parse --git-dir) ]; my $opt = { '-C' => $d }; if (defined($lei->{opt}->{cwd})) { # --cwd used, report errors $opt->{2} = $lei->{2}; @@ -36,14 +36,13 @@ sub solver_user_cb { # called by solver when done ref($res) eq 'ARRAY' or return $lei->child_error(0, $$log_buf); $lei->qerr($$log_buf); my ($git, $oid, $type, $size, $di) = @$res; - my $gd = $git->{git_dir}; # don't try to support all the git-show(1) options for non-blob, # this is just a convenience: - $type ne 'blob' and - warn "# $oid is a $type of $size bytes in:\n#\t$gd\n"; - - my $cmd = [ 'git', "--git-dir=$gd", 'show', $oid ]; + $type ne 'blob' and warn <<EOM; +# $oid is a $type of $size bytes in:\n#\t$git->{git_dir} +EOM + my $cmd = $git->cmd('show', $oid); my $rdr = { 1 => $lei->{1}, 2 => $lei->{2} }; run_wait($cmd, $lei->{env}, $rdr) and $lei->child_error($?); } @@ -119,14 +118,17 @@ sub lei_blob { } else { open $rdr->{2}, '>', '/dev/null' or die "open: $!"; } - my $cmd = [ 'git', '--git-dir='.$lei->ale->git->{git_dir}, - 'cat-file', 'blob', $blob ]; + my $cmd = $lei->ale->git->cmd('cat-file', 'blob', $blob); + my $cerr; if (defined $lei->{-attach_idx}) { my $buf = run_qx($cmd, $lei->{env}, $rdr); return extract_attach($lei, $blob, \$buf) unless $?; + $cerr = $?; + } else { + $rdr->{1} = $lei->{1}; # write directly to client + $cerr = run_wait($cmd, $lei->{env}, $rdr) or return; } - $rdr->{1} = $lei->{1}; - my $cerr = run_wait($cmd, $lei->{env}, $rdr) or return; + # fall back to unimported ('lei index') and inflight blobs my $lms = $lei->lms; my $bref = ($lms ? $lms->local_blob($blob, 1) : undef) // do { my $sto = $lei->{sto} // $lei->_lei_store; diff --git a/lib/PublicInbox/LeiConfig.pm b/lib/PublicInbox/LeiConfig.pm index a50ff2b6..ae12249f 100644 --- a/lib/PublicInbox/LeiConfig.pm +++ b/lib/PublicInbox/LeiConfig.pm @@ -3,6 +3,7 @@ package PublicInbox::LeiConfig; # subclassed by LeiEditSearch use v5.12; use PublicInbox::PktOp; +use PublicInbox::Git qw(git_exe); use Fcntl qw(SEEK_SET); use autodie qw(open seek); use PublicInbox::IO qw(read_all); @@ -11,7 +12,7 @@ sub cfg_do_edit ($;$) { my ($self, $reason) = @_; my $lei = $self->{lei}; $lei->pgr_err($reason) if defined $reason; - my $cmd = [ qw(git config --edit -f), $self->{-f} ]; + my $cmd = [ git_exe, qw(config --edit -f), $self->{-f} ]; my $env = { GIT_CONFIG => $self->{-f} }; $self->cfg_edit_begin if $self->can('cfg_edit_begin'); # run in script/lei foreground diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index d003d983..0a6aba82 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -5,6 +5,7 @@ package PublicInbox::LeiInput; use v5.12; use PublicInbox::DS; +use PublicInbox::Git qw(git_exe); use PublicInbox::Spawn qw(which popen_rd); use PublicInbox::InboxWritable qw(eml_from_path); @@ -252,7 +253,8 @@ sub input_path_url { each_ibx_eml($self, $esrch, @args); } elsif ($self->{missing_ok} && !-e $input) { # don't ->fail if ($lei->{cmd} eq 'p2q') { - my $fp = [ qw(git format-patch --stdout -1), $input ]; + my $fp = [ git_exe, qw(format-patch --stdout -1), + $input ]; my $rdr = { 2 => $lei->{2} }; my $fh = popen_rd($fp, undef, $rdr); eval { $self->input_fh('eml', $fh, $input, @args) }; @@ -499,7 +501,7 @@ sub process_inputs { } # always commit first, even on error partial work is acceptable for # lei <import|tag|convert> - $self->{lei}->sto_done_request; + $self->{lei}->sto_barrier_request; $self->{lei}->fail($err) if $err; } diff --git a/lib/PublicInbox/LeiMailDiff.pm b/lib/PublicInbox/LeiMailDiff.pm index af6ecf82..270de507 100644 --- a/lib/PublicInbox/LeiMailDiff.pm +++ b/lib/PublicInbox/LeiMailDiff.pm @@ -7,13 +7,14 @@ package PublicInbox::LeiMailDiff; use v5.12; use parent qw(PublicInbox::IPC PublicInbox::LeiInput PublicInbox::MailDiff); use PublicInbox::Spawn qw(run_wait); +use PublicInbox::Git qw(git_exe); require PublicInbox::LeiRediff; sub diff_a ($$) { my ($self, $eml) = @_; my $dir = "$self->{tmp}/N".(++$self->{nr}); $self->dump_eml($dir, $eml); - my $cmd = [ qw(git diff --no-index) ]; + my $cmd = [ git_exe, qw(diff --no-index) ]; my $lei = $self->{lei}; PublicInbox::LeiRediff::_lei_diff_prepare($lei, $cmd); push @$cmd, qw(-- a), "N$self->{nr}"; diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 5353ae61..e7c265bd 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -24,6 +24,7 @@ use POSIX qw(strftime); use PublicInbox::Admin qw(fmt_localtime); use autodie qw(chdir chmod close open pipe readlink seek symlink sysopen sysseek truncate unlink); +use PublicInbox::Git qw(git_exe); our $LIVE; # pid => callback our $FGRP_TODO; # objstore -> [[ to resume ], [ to clone ]] @@ -105,7 +106,7 @@ E: confused by scraping <$uri>, got ambiguous results: sub clone_cmd { my ($lei, $opt) = @_; - my @cmd = qw(git); + my @cmd = (git_exe); $opt->{$_} = $lei->{$_} for (0..2); # we support "-c $key=$val" for arbitrary git config options # e.g.: git -c http.proxy=socks5h://127.0.0.1:9050 @@ -291,9 +292,9 @@ sub upr { # feed `git update-ref --stdin -z' verbosely sub start_update_ref { my ($fgrp) = @_; pipe(my $r, my $w); - my $cmd = [ 'git', "--git-dir=$fgrp->{cur_dst}", + my $cmd = [ git_exe, "--git-dir=$fgrp->{cur_dst}", qw(update-ref --stdin -z) ]; - my $pack = PublicInbox::OnDestroy->new($$, \&satellite_done, $fgrp); + my $pack = on_destroy \&satellite_done, $fgrp; start_cmd($fgrp, $cmd, { 0 => $r, 2 => $fgrp->{lei}->{2} }, $pack); close $r; $fgrp->{dry_run} ? undef : $w; @@ -353,7 +354,7 @@ sub satellite_done { sub pack_refs { my ($self, $git_dir) = @_; - my $cmd = [ 'git', "--git-dir=$git_dir", qw(pack-refs --all --prune) ]; + my $cmd = [git_exe, "--git-dir=$git_dir", qw(pack-refs --all --prune)]; start_cmd($self, $cmd, { 2 => $self->{lei}->{2} }); } @@ -373,18 +374,16 @@ sub fgrpv_done { for my $fgrp (@$fgrpv) { my $rn = $fgrp->{-remote}; my %opt = ( 2 => $fgrp->{lei}->{2} ); - - my $update_ref = PublicInbox::OnDestroy->new($$, - \&fgrp_update, $fgrp); - - my $src = [ 'git', "--git-dir=$fgrp->{-osdir}", 'for-each-ref', + my $update_ref = on_destroy \&fgrp_update, $fgrp; + my $src = [ git_exe, "--git-dir=$fgrp->{-osdir}", + 'for-each-ref', "--format=refs/%(refname:lstrip=3)%00%(objectname)", "refs/remotes/$rn/" ]; open(my $sfh, '+>', undef); $fgrp->{srcfh} = $sfh; start_cmd($fgrp, $src, { %opt, 1 => $sfh }, $update_ref); - my $dst = [ 'git', "--git-dir=$fgrp->{cur_dst}", 'for-each-ref', - '--format=%(refname)%00%(objectname)' ]; + my $dst = [ git_exe, "--git-dir=$fgrp->{cur_dst}", + 'for-each-ref', '--format=%(refname)%00%(objectname)' ]; open(my $dfh, '+>', undef); $fgrp->{dstfh} = $dfh; start_cmd($fgrp, $dst, { %opt, 1 => $dfh }, $update_ref); @@ -402,7 +401,7 @@ sub fgrp_fetch_all { # system argv limits: my $grp = 'fgrptmp'; - my @git = (@{$self->{-torsocks}}, 'git'); + my @git = (@{$self->{-torsocks}}, git_exe); my $j = $self->{lei}->{opt}->{jobs}; my $opt = {}; my @fetch = do { @@ -416,7 +415,7 @@ sub fgrp_fetch_all { my ($old, $new) = @$fgrp_old_new; @$old = sort { $b->{-sort} <=> $a->{-sort} } @$old; # $new is ordered by {references} - my $cmd = ['git', "--git-dir=$osdir", qw(config -f), $f ]; + my $cmd = [ git_exe, "--git-dir=$osdir", qw(config -f), $f ]; # clobber settings from previous run atomically for ("remotes.$grp", 'fetch.hideRefs') { @@ -467,7 +466,7 @@ sub fgrp_fetch_all { } $cmd = [ @git, "--git-dir=$osdir", @fetch, $grp ]; push @$old, @$new; - my $end = PublicInbox::OnDestroy->new($$, \&fgrpv_done, $old); + my $end = on_destroy \&fgrpv_done, $old; start_cmd($self, $cmd, $opt, $end); } } @@ -544,7 +543,7 @@ sub cmp_fp_do { return if $cur_ent->{fingerprint} eq $new; } my $dst = $self->{cur_dst} // $self->{dst}; - my $cmd = ['git', "--git-dir=$dst", 'show-ref']; + my $cmd = [git_exe, "--git-dir=$dst", 'show-ref']; my $opt = { 2 => $self->{lei}->{2} }; open($opt->{1}, '+>', undef); $self->{-show_ref} = $opt->{1}; @@ -558,7 +557,7 @@ sub resume_fetch { my ($self, $uri, $fini) = @_; return if !keep_going($self); my $dst = $self->{cur_dst} // $self->{dst}; - my @git = ('git', "--git-dir=$dst"); + my @git = (git_exe, "--git-dir=$dst"); my $opt = { 2 => $self->{lei}->{2} }; my $rn = 'random'.int(rand(1 << 30)); for ("url=$uri", "fetch=+refs/*:refs/*", 'mirror=true') { @@ -567,7 +566,7 @@ sub resume_fetch { my $cmd = [ @{$self->{-torsocks}}, @git, fetch_args($self->{lei}, $opt), $rn ]; push @$cmd, '-P' if $self->{lei}->{prune}; # --prune-tags implied - my $run_puh = PublicInbox::OnDestroy->new($$, \&run_puh, $self, $fini); + my $run_puh = on_destroy \&run_puh, $self, $fini; ++$self->{chg}->{nr_chg}; start_cmd($self, $cmd, $opt, $run_puh); } @@ -599,7 +598,7 @@ sub clone_v1 { return; } } - my $fini = PublicInbox::OnDestroy->new($$, \&v1_done, $self); + my $fini = on_destroy \&v1_done, $self; if (my $fgrp = forkgroup_prep($self, $uri)) { $fgrp->{-fini} = $fini; if ($resume) { @@ -621,8 +620,8 @@ sub clone_v1 { } } ++$self->{chg}->{nr_chg}; - start_cmd($self, $cmd, $opt, PublicInbox::OnDestroy->new($$, - \&run_puh, $self, $fini)); + start_cmd($self, $cmd, $opt, + on_destroy(\&run_puh, $self, $fini)); } if (!$self->{-is_epoch} && $lei->{opt}->{'inbox-config'} =~ /\A(?:always|v1)\z/s && @@ -737,7 +736,7 @@ sub atomic_write ($$$) { sub run_next_puh { my ($self) = @_; my $puh = shift @{$self->{-puh_todo}} // return delete($self->{-fini}); - my $fini = PublicInbox::OnDestroy->new($$, \&run_next_puh, $self); + my $fini = on_destroy \&run_next_puh, $self; my $cmd = [ @$puh, ($self->{cur_dst} // $self->{dst}) ]; my $opt = +{ map { $_ => $self->{lei}->{$_} } (0..2) }; start_cmd($self, $cmd, undef, $opt, $fini); @@ -758,18 +757,18 @@ sub update_ent { my $cur = $self->{-local_manifest}->{$key}->{fingerprint} // "\0"; my $dst = $self->{cur_dst} // $self->{dst}; if (defined($new) && $new ne $cur) { - my $cmd = ['git', "--git-dir=$dst", 'show-ref']; + my $cmd = [git_exe, "--git-dir=$dst", 'show-ref']; my $opt = { 2 => $self->{lei}->{2} }; open($opt->{1}, '+>', undef); $self->{-show_ref_up} = $opt->{1}; - my $done = PublicInbox::OnDestroy->new($$, \&up_fp_done, $self); + my $done = on_destroy \&up_fp_done, $self; start_cmd($self, $cmd, $opt, $done); } $new = $self->{-ent}->{head}; $cur = $self->{-local_manifest}->{$key}->{head} // "\0"; if (defined($new) && $new ne $cur) { # n.b. grokmirror writes raw contents to $dst/HEAD w/o locking - my $cmd = [ 'git', "--git-dir=$dst" ]; + my $cmd = [ git_exe, "--git-dir=$dst" ]; if ($new =~ s/\Aref: //) { push @$cmd, qw(symbolic-ref HEAD), $new; } elsif ($new =~ /\A[a-f0-9]{40,}\z/) { @@ -814,7 +813,8 @@ sub update_ent { $cur = $self->{-local_manifest}->{$key}->{owner} // "\0"; return if $cur eq $new; utf8::encode($new); # to octets - my $cmd = [ qw(git config -f), "$dst/config", 'gitweb.owner', $new ]; + my $cmd = [ git_exe, qw(config -f), "$dst/config", + 'gitweb.owner', $new ]; start_cmd($self, $cmd, { 2 => $self->{lei}->{2} }); } @@ -856,7 +856,7 @@ sub v2_done { # called via OnDestroy my $dst = $self->{cur_dst} // $self->{dst}; require PublicInbox::Lock; my $lk = PublicInbox::Lock->new("$dst/inbox.lock"); - my $lck = $lk->lock_for_scope($$); + my $lck = $lk->lock_for_scope; _write_inbox_config($self); require PublicInbox::MultiGit; my $mg = PublicInbox::MultiGit->new($dst, 'all.git', 'git'); @@ -883,7 +883,7 @@ sub clone_v2_prep ($$;$) { my $want = parse_epochs($lei->{opt}->{epoch}, $v2_epochs); my $task = $m ? bless { %$self }, __PACKAGE__ : $self; my (@skip, $desc); - my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task); + my $fini = on_destroy \&v2_done, $task; for my $nr (sort { $a <=> $b } keys %$v2_epochs) { my ($uri, $key) = @{$v2_epochs->{$nr}}; my $src = $uri->as_string; @@ -1018,7 +1018,7 @@ sub clone_all { my ($self, $m) = @_; my $todo = $TODO; $TODO = \'BUG on further use'; - my $end = PublicInbox::OnDestroy->new($$, \&fgrp_fetch_all, $self); + my $end = on_destroy \&fgrp_fetch_all, $self; { my $nodep = delete $todo->{''}; diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index 610adb78..68faa016 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -189,7 +189,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point sub ipc_atfork_child { my ($self) = @_; PublicInbox::LeiInput::input_only_atfork_child($self); - PublicInbox::OnDestroy->new($$, \&emit_query, $self); + on_destroy \&emit_query, $self; } no warnings 'once'; diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm index 35728330..66359dd4 100644 --- a/lib/PublicInbox/LeiRediff.pm +++ b/lib/PublicInbox/LeiRediff.pm @@ -119,17 +119,16 @@ EOM map { $_->git_path('objects')."\n" } @{$self->{gits}}; $rw = PublicInbox::Git->new($d); } - my $w = popen_wr(['git', "--git-dir=$rw->{git_dir}", - qw(fast-import --quiet --done --date-format=raw)], + my $w = popen_wr($rw->cmd(qw(fast-import + --quiet --done --date-format=raw)), $lei->{env}, { 2 => $lei->{2} }); print $w $ta, "\n", $tb, "\ndone\n" or die "print fast-import: $!"; $w->close or die "close w fast-import: \$?=$? \$!=$!"; - my $cmd = [ 'diff' ]; + my $cmd = $rw->cmd('diff'); _lei_diff_prepare($lei, $cmd); - $lei->qerr("# git @$cmd"); + $lei->qerr("# git @$cmd[2..$#$cmd]"); push @$cmd, qw(A B); - unshift @$cmd, 'git', "--git-dir=$rw->{git_dir}"; run_wait($cmd, $lei->{env}, { 2 => $lei->{2}, 1 => $lei->{1} }) and $lei->child_error($?); # for git diff --exit-code undef; diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm index a60a9a5e..dde23274 100644 --- a/lib/PublicInbox/LeiRefreshMailSync.pm +++ b/lib/PublicInbox/LeiRefreshMailSync.pm @@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url $self->folder_missing($$uri); } } else { die "BUG: $input not supported" } - $self->{lei}->sto_done_request; + $self->{lei}->sto_barrier_request; } sub lei_refresh_mail_sync { diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm index 559fb8d5..d6fc40a4 100644 --- a/lib/PublicInbox/LeiRemote.pm +++ b/lib/PublicInbox/LeiRemote.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # Make remote externals HTTP(S) inboxes behave like @@ -51,7 +51,7 @@ sub mset { $fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1); eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) }; my $err = $@ ? ": $@" : ''; - my $wait = $self->{lei}->{sto}->wq_do('done'); + my $wait = $self->{lei}->{sto}->wq_do('barrier'); $lei->child_error($?, "@$cmd failed$err") if $err || $?; $self; # we are the mset (and $ibx, and $self) } @@ -67,9 +67,16 @@ sub base_url { "$_[0]->{uri}" } sub smsg_eml { my ($self, $smsg) = @_; - if (my $bref = $self->{lei}->ale->git->cat_file($smsg->{blob})) { - return PublicInbox::Eml->new($bref); - } + my $bref = $self->{lei}->ale->git->cat_file($smsg->{blob}) // do { + my $lms = $self->{lei}->lms; + ($lms ? $lms->local_blob($smsg->{blob}, 1) : undef) // do { + my $sto = $self->{lei}->{sto} // + $self->{lei}->_lei_store; + $sto && $sto->{-wq_s1} ? + $sto->wq_do('cat_blob', $smsg->{blob}) : undef; + } + }; + return PublicInbox::Eml->new($bref) if $bref; warn("E: $self->{uri} $smsg->{blob} gone <$smsg->{mid}>\n"); undef; } diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm index 9ae9dcdb..c27b6f86 100644 --- a/lib/PublicInbox/LeiSavedSearch.pm +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -5,7 +5,7 @@ package PublicInbox::LeiSavedSearch; use v5.12; use parent qw(PublicInbox::Lock); -use PublicInbox::Git; +use PublicInbox::Git qw(git_exe); use PublicInbox::OverIdx; use PublicInbox::LeiSearch; use PublicInbox::Config; @@ -176,7 +176,7 @@ sub description { $_[0]->{qstr} } # for WWW sub cfg_set { # called by LeiXSearch my ($self, @args) = @_; my $lk = $self->lock_for_scope; # git-config doesn't wait - run_die([qw(git config -f), $self->{'-f'}, @args]); + run_die([git_exe, qw(config -f), $self->{'-f'}, @args]); } # drop-in for LeiDedupe API @@ -263,8 +263,6 @@ sub reset_dedupe { sub mm { undef } -sub altid_map { {} } - sub cloneurl { [] } # find existing directory containing a `lei.saved-search' file based on diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index a752174d..b2da2bc3 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -28,6 +28,7 @@ use PublicInbox::Spawn qw(spawn); use PublicInbox::MdirReader; use PublicInbox::LeiToMail; use PublicInbox::Compat qw(uniqstr); +use PublicInbox::OnDestroy; use File::Temp qw(tmpnam); use POSIX (); use IO::Handle (); # ->autoflush @@ -80,7 +81,7 @@ sub importer { delete $self->{im}; $im->done; undef $im; - $self->checkpoint; + $self->barrier; $max = $self->{priv_eidx}->{mg}->git_epochs + 1; } my (undef, $tl) = eidx_init($self); # acquire lock @@ -117,7 +118,7 @@ sub cat_blob { sub schedule_commit { my ($self, $sec) = @_; - add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self); + add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self); } # follows the stderr file @@ -135,7 +136,7 @@ sub eidx_init { my ($self) = @_; my $eidx = $self->{priv_eidx}; my $tl = wantarray && $self->{-err_wr} ? - PublicInbox::OnDestroy->new($$, \&_tail_err, $self) : + on_destroy(\&_tail_err, $self) : undef; $eidx->idx_init({-private => 1}); # acquires lock wantarray ? ($eidx, $tl) : $eidx; @@ -390,7 +391,7 @@ sub reindex_done { my ($self) = @_; my ($eidx, $tl) = eidx_init($self); $eidx->git->async_wait_all; - # ->done to be called via sto_done_request + # ->done to be called via sto_barrier_request } sub add_eml { @@ -570,13 +571,11 @@ sub set_xvmd { sto_export_kw($self, $smsg->{num}, $vmd); } -sub checkpoint { - my ($self, $wait) = @_; - if (my $im = $self->{im}) { - $wait ? $im->barrier : $im->checkpoint; - } - delete $self->{lms}; - $self->{priv_eidx}->checkpoint($wait); +sub check_done { + my ($self) = @_; + $self->git->_active ? + add_uniq_timer("$self-check_done", 5, \&check_done, $self) : + done($self); } sub xchg_stderr { @@ -593,23 +592,33 @@ sub xchg_stderr { undef; } -sub done { - my ($self) = @_; - my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request +sub _commit ($$) { + my ($self, $cmd) = @_; # cmd is 'done' or 'barrier' + my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request my @err; - if (my $im = delete($self->{im})) { - eval { $im->done }; - push(@err, "E: import done: $@\n") if $@; + if ($self->{im}) { + eval { $self->{im}->$cmd }; + push(@err, "E: import $cmd: $@\n") if $@; } delete $self->{lms}; - eval { $self->{priv_eidx}->done }; # V2Writable::done - push(@err, "E: priv_eidx done: $@\n") if $@; - print { $errfh // *STDERR{GLOB} } @err; + eval { $self->{priv_eidx}->$cmd }; + push(@err, "E: priv_eidx $cmd: $@\n") if $@; + print { $errfh // \*STDERR } @err; send($lei_sock, 'child_error 256', 0) if @err && $lei_sock; xchg_stderr($self); die @err if @err; + # $lei_sock goes out-of-scope and script/lei can terminate } +sub barrier { + my ($self) = @_; + _commit $self, 'barrier'; + add_uniq_timer("$self-check_done", 5, \&check_done, $self); + undef; +} + +sub done { _commit $_[0], 'done' } + sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index 320b0355..da8caeb7 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -1,12 +1,12 @@ -# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # handles "lei tag" command package PublicInbox::LeiTag; -use strict; -use v5.10.1; +use v5.12; use parent qw(PublicInbox::IPC PublicInbox::LeiInput); use PublicInbox::InboxWritable qw(eml_from_path); +use PublicInbox::OnDestroy; sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh my ($self, $eml) = @_; @@ -49,7 +49,7 @@ sub ipc_atfork_child { PublicInbox::LeiInput::input_only_atfork_child($self); $self->{lse} = $self->{lei}->{sto}->search; # this goes out-of-scope at worker process exit: - PublicInbox::OnDestroy->new($$, \¬e_unimported, $self); + on_destroy \¬e_unimported, $self; } # Workaround bash word-splitting s to ['kw', ':', 'keyword' ...] diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index dfae29e9..5481b5e4 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -14,7 +14,7 @@ use PublicInbox::Import; use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); -use autodie qw(open seek close); +use autodie qw(pipe open seek close); use Carp qw(croak); my %kw2char = ( # Maildir characters @@ -605,7 +605,7 @@ sub _pre_augment_mbox { $lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe'); } if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) { - pipe(my ($r, $w)) or die "pipe: $!"; + pipe(my $r, my $w); $lei->{zpipe} = [ $r, $w ]; $lei->{ovv}->{lock_path} and die 'BUG: unexpected {ovv}->{lock_path}'; @@ -719,16 +719,32 @@ sub do_augment { # slow, runs in wq worker $m->($self, $lei); } +sub post_augment_call ($$$$) { + my ($self, $lei, $m, $post_augment_done) = @_; + eval { $m->($self, $lei) }; + $lei->{post_augment_err} = $@ if $@; # for post_augment_done +} + # fast (spawn compressor or mkdir), runs in same process as pre_augment sub post_augment { - my ($self, $lei, @args) = @_; + my ($self, $lei, $post_augment_done) = @_; $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); - my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; - $m->($self, $lei, @args); + + # --import-before is only for lei-(q|lcat), not lei-convert + $lei->{opt}->{'import-before'} or + return post_augment_call $self, $lei, $m, $post_augment_done; + + # we can't deal with post_augment until import-before commits: + require PublicInbox::EOFpipe; + my @io = @$lei{qw(2 sock)}; + pipe(my $r, $io[2]); + PublicInbox::EOFpipe->new($r, \&post_augment_call, + $self, $lei, $m, $post_augment_done); + $lei->{sto}->wq_io_do('barrier', \@io); + # _post_augment_* && post_augment_done run when barrier is complete } # called by every single l2m worker process diff --git a/lib/PublicInbox/LeiViewText.pm b/lib/PublicInbox/LeiViewText.pm index c7d72c71..6510b19e 100644 --- a/lib/PublicInbox/LeiViewText.pm +++ b/lib/PublicInbox/LeiViewText.pm @@ -12,6 +12,7 @@ use PublicInbox::View; use PublicInbox::Hval; use PublicInbox::ViewDiff; use PublicInbox::Spawn qw(popen_rd); +use PublicInbox::Git qw(git_exe); use Term::ANSIColor; use POSIX (); use PublicInbox::Address; @@ -72,7 +73,7 @@ sub new { my $self = bless { %{$lei->{opt}}, -colored => \&uncolored }, $cls; $self->{-quote_reply} = 1 if $fmt eq 'reply'; return $self unless $self->{color} //= -t $lei->{1}; - my @cmd = qw(git config -z --includes -l); # reuse normal git config + my @cmd = (git_exe, qw(config -z --includes -l)); # reuse normal git cfg my $r = popen_rd(\@cmd, undef, { 2 => $lei->{2} }); my $cfg = PublicInbox::Config::config_fh_parse($r, "\0", "\n"); if (!$r->close) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index fc95d401..43dedd10 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -13,7 +13,7 @@ use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); use PublicInbox::Spawn qw(popen_rd popen_wr which); -use PublicInbox::MID qw(mids); +use PublicInbox::MID qw(mids mid_escape); use PublicInbox::Smsg; use PublicInbox::Eml; use PublicInbox::LEI; @@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); +use PublicInbox::OnDestroy; sub new { my ($class) = @_; @@ -160,6 +161,8 @@ sub query_one_mset { # for --threads and l2m w/o sort my $can_kw = !!$ibxish->can('msg_keywords'); my $threads = $lei->{opt}->{threads} // 0; my $fl = $threads > 1 ? 1 : undef; + my $mid = $lei->{opt}->{'thread-id'}; + $mo->{threadid} = $over->mid2tid($mid) if defined $mid; my $lss = $lei->{lss}; my $maxk = "external.$dir.maxuid"; # max of previous, so our min my $min = $lss ? ($lss->{-cfg}->{$maxk} // 0) : 0; @@ -339,6 +342,12 @@ print STDERR $_; push @$curl, '-s', '-d', ''; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); $self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'}; + if (defined(my $mid = $opt->{'thread-id'})) { + $mid = mid_escape($mid); + for my $uri (@$uris) { + $uri->path($uri->path.$mid.'/'); + } + } for my $uri (@$uris) { $lei->{-current_url} = $uri->as_string; my $start = time; @@ -355,7 +364,7 @@ print STDERR $_; $self, $lei, $each_smsg); }; my ($exc, $code) = ($@, $?); - $lei->sto_done_request if delete($self->{-sto_imported}); + $lei->sto_barrier_request if delete($self->{-sto_imported}); die "E: $exc" if $exc && !$code; my $nr = delete $lei->{-nr_remote_eml} // 0; if (!$code) { # don't update if no results, maybe MTA is down @@ -391,7 +400,7 @@ sub query_done { # EOF callback for main daemon delete $lei->{lxs}; ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; - $lei->sto_done_request; + $lei->sto_barrier_request; $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress $l2m->finish_output($lei); @@ -420,11 +429,9 @@ sub query_done { # EOF callback for main daemon $lei->dclose; } -sub do_post_augment { +sub post_augment_done { # via on_destroy in top-level lei-daemon my ($lei) = @_; - my $l2m = $lei->{l2m} or return; # client disconnected - eval { $l2m->post_augment($lei) }; - my $err = $@; + my $err = delete $lei->{post_augment_err}; if ($err) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill(-POSIX::SIGTERM()); @@ -439,6 +446,12 @@ sub do_post_augment { close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't } +sub do_post_augment { + my ($lei) = @_; + my $l2m = $lei->{l2m} or return; # client disconnected + $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei)); +} + sub incr_post_augment { # called whenever an l2m shard finishes augment my ($lei) = @_; my $l2m = $lei->{l2m} or return; # client disconnected @@ -459,7 +472,9 @@ sub concurrency { sub start_query ($$) { # always runs in main (lei-daemon) process my ($self, $lei) = @_; local $PublicInbox::LEI::current_lei = $lei; - if ($self->{opt_threads} || ($lei->{l2m} && !$self->{opt_sort})) { + if ($lei->{opt}->{threads} || + defined($lei->{opt}->{'thread-id'}) || + ($lei->{l2m} && !$lei->{opt}->{'sort'})) { for my $ibxish (locals($self)) { $self->wq_io_do('query_one_mset', [], $ibxish); } @@ -546,8 +561,6 @@ sub do_query { my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); - $self->{opt_threads} = $lei->{opt}->{threads}; - $self->{opt_sort} = $lei->{opt}->{'sort'}; $self->{-do_lcat} = !!(delete $lei->{lcat_todo}); if ($l2m) { $l2m->net_merge_all_done($lei) unless $lei->{auth}; diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index ddaf3312..7162d80e 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -41,9 +41,9 @@ sub lock_release { # caller must use return value sub lock_for_scope { - my ($self, @single_pid) = @_; + my ($self) = @_; lock_acquire($self) or return; # lock_path not set - PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self); + on_destroy \&lock_release, $self; } sub lock_acquire_fast { @@ -58,9 +58,9 @@ sub lock_release_fast { # caller must use return value sub lock_for_scope_fast { - my ($self, @single_pid) = @_; + my ($self) = @_; lock_acquire_fast($self) or return; # lock_path not set - PublicInbox::OnDestroy->new(@single_pid, \&lock_release_fast, $self); + on_destroy \&lock_release_fast, $self; } 1; diff --git a/lib/PublicInbox/MHreader.pm b/lib/PublicInbox/MHreader.pm index 3e7bbd5c..16e505a2 100644 --- a/lib/PublicInbox/MHreader.pm +++ b/lib/PublicInbox/MHreader.pm @@ -52,7 +52,7 @@ EOM sub mh_each_file { my ($self, $efcb, @arg) = @_; opendir(my $dh, my $dir = $self->{dir}); - my $restore = PublicInbox::OnDestroy->new($$, \&chdir, $self->{cwdfh}); + my $restore = on_destroy \&chdir, $self->{cwdfh}; chdir($dh); my $sort = $self->{sort}; if (defined $sort && "@$sort" ne 'none') { @@ -96,7 +96,7 @@ sub mh_each_eml { sub mh_read_one { my ($self, $n, $ucb, @arg) = @_; - my $restore = PublicInbox::OnDestroy->new($$, \&chdir, $self->{cwdfh}); + my $restore = on_destroy \&chdir, $self->{cwdfh}; chdir(my $dir = $self->{dir}); _file2eml($dir, $n, $self, $ucb, @arg); } diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm index 125360fe..ce268bfb 100644 --- a/lib/PublicInbox/MailDiff.pm +++ b/lib/PublicInbox/MailDiff.pm @@ -7,6 +7,7 @@ use PublicInbox::ContentHash qw(content_digest); use PublicInbox::MsgIter qw(msg_part_text); use PublicInbox::ViewDiff qw(flush_diff); use PublicInbox::GitAsyncCat; +use PublicInbox::Git qw(git_exe); use PublicInbox::ContentDigestDbg; use PublicInbox::Qspawn; use PublicInbox::IO qw(write_file); @@ -81,7 +82,7 @@ sub do_diff { my $n = 'N'.(++$self->{nr}); my $dir = "$self->{tmp}/$n"; $self->dump_eml($dir, $eml); - my $cmd = [ qw(git diff --no-index --no-color -- a), $n ]; + my $cmd = [ git_exe, qw(diff --no-index --no-color -- a), $n ]; my $opt = { -C => "$self->{tmp}", quiet => 1 }; my $qsp = PublicInbox::Qspawn->new($cmd, undef, $opt); $qsp->psgi_qx($self->{ctx}->{env}, undef, \&emit_msg_diff, $self); diff --git a/lib/PublicInbox/ManifestJsGz.pm b/lib/PublicInbox/ManifestJsGz.pm index 1f739baa..be5d5f2a 100644 --- a/lib/PublicInbox/ManifestJsGz.pm +++ b/lib/PublicInbox/ManifestJsGz.pm @@ -82,8 +82,9 @@ sub response { $ctx->can('list_match_i'), $re, $ctx); sub { $ctx->{-wcb} = $_[0]; # HTTP server callback - $ctx->{env}->{'pi-httpd.async'} ? - $iter->event_step : $iter->each_section; + ($ctx->{www}->{pi_cfg}->ALL || + !$ctx->{env}->{'pi-httpd.async'}) ? + $iter->each_section : $iter->event_step; } } diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm index 52f88ae3..17893a09 100644 --- a/lib/PublicInbox/Mbox.pm +++ b/lib/PublicInbox/Mbox.pm @@ -4,7 +4,7 @@ # Streaming interface for mboxrd HTTP responses # See PublicInbox::GzipFilter for details. package PublicInbox::Mbox; -use strict; +use v5.12; use parent 'PublicInbox::GzipFilter'; use PublicInbox::MID qw/mid_escape/; use PublicInbox::Hval qw/to_filename/; @@ -31,8 +31,8 @@ sub async_next { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; # client aborted eval { - my $smsg = $ctx->{smsg} or return $ctx->close; - $ctx->smsg_blob($smsg); + my $smsg = $ctx->{smsg} // return $ctx->close; + $ctx->smsg_blob($smsg) if $smsg; }; warn "E: $@" if $@; } @@ -159,6 +159,7 @@ sub all_ids_cb { } $ctx->{ids} = $ids = $over->ids_after(\($ctx->{prev})); } while (@$ids); + undef; } sub mbox_all_ids { @@ -175,56 +176,79 @@ sub mbox_all_ids { PublicInbox::MboxGz::mbox_gz($ctx, \&all_ids_cb, 'all'); } -sub results_cb { - my ($ctx) = @_; - my $over = $ctx->{ibx}->over or return $ctx->gone('over'); - while (1) { - while (defined(my $num = shift(@{$ctx->{ids}}))) { - my $smsg = $over->get_art($num) or next; - return $smsg; - } - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; +my $refill_ids_cb = sub { # async_mset cb + my ($ctx, $http, $mset, $err) = @_; + $http = undef unless $ctx->{-really_async}; + if ($err) { + warn "E: $err"; + $ctx->close if $http; # our async httpd + return; } -} - -sub results_thread_cb { - my ($ctx) = @_; + # refill result set, deprioritize since there's many results + my $size = $mset->size or do { + $ctx->close if $http; + $ctx->{-mbox_done} = 1; + return; + }; + $ctx->{qopts}->{offset} += $size; + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + $ctx->{-low_prio} = 1; # true + return if !$http; + eval { + my $smsg = results_cb($ctx) // return $ctx->close; + return if !$smsg; # '' wait for async_mset + $ctx->smsg_blob($ctx->{smsg} = $smsg); + }; + warn "E: $@" if $@; +}; +sub results_cb { # async_next or MboxGz->getline cb + my ($ctx, $http) = @_; my $over = $ctx->{ibx}->over or return $ctx->gone('over'); while (1) { - while (defined(my $num = shift(@{$ctx->{xids}}))) { + my $ids = $ctx->{xids} // $ctx->{ids}; + while (defined(my $num = shift(@$ids))) { my $smsg = $over->get_art($num) or next; return $smsg; } - - # refills ctx->{xids} - next if $over->expand_thread($ctx); - - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; + next if $ctx->{xids} && $over->expand_thread($ctx); + return '' if $ctx->{srch}->async_mset(@$ctx{qw(query qopts)}, + $refill_ids_cb, $ctx, $http); + return if $ctx->{-mbox_done}; } +} +sub mbox_qry_cb { # async_mset cb + my ($ctx, $q, $mset, $err) = @_; + my $wcb = delete $ctx->{wcb}; + if ($err) { + warn "E: $err"; + return $wcb->([500, [qw(Content-Type text/plain)], + [ "Internal server error\n" ]]) + } + $ctx->{qopts}->{offset} = $mset->size or + return $wcb->([404, [qw(Content-Type text/plain)], + ["No results found\n"]]); + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + my $fn; + if ($q->{t} && $ctx->{srch}->has_threadid) { + $ctx->{xids} = []; # triggers over->expand_thread + $fn = "results-thread-$ctx->{query}"; + } else { + $fn = "results-$ctx->{query}"; + } + require PublicInbox::MboxGz; + my $res = PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } sub mbox_all { my ($ctx, $q) = @_; - my $q_string = $q->{'q'}; - return mbox_all_ids($ctx) if $q_string !~ /\S/; - my $srch = $ctx->{ibx}->isrch or + my $qstr = $q->{'q'}; + return mbox_all_ids($ctx) if $qstr !~ /\S/; + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); - - my $qopts = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC + my $opt = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC # {threadid} limits results to a given thread # {threads} collapses results from messages in the same thread, @@ -234,25 +258,16 @@ sub mbox_all { $ctx->{ibx}->{isrch}->{es}->over : $ctx->{ibx}->over) or return PublicInbox::WWW::need($ctx, 'Overview'); - $qopts->{threadid} = $over->mid2tid($ctx->{mid}); - } - $qopts->{threads} = 1 if $q->{t}; - $srch->query_approxidate($ctx->{ibx}->git, $q_string); - my $mset = $srch->mset($q_string, $qopts); - $qopts->{offset} = $mset->size or - return [404, [qw(Content-Type text/plain)], - ["No results found\n"]]; - $ctx->{query} = $q_string; - $ctx->{ids} = $srch->mset_to_artnums($mset, $qopts); - require PublicInbox::MboxGz; - my $fn; - if ($q->{t} && $srch->has_threadid) { - $fn = 'results-thread-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_thread_cb, $fn); - } else { - $fn = 'results-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + $opt->{threadid} = $over->mid2tid($ctx->{mid}); } + $opt->{threads} = 1 if $q->{t}; + $srch->query_approxidate($ctx->{ibx}->git, $qstr); + $ctx->{query} = $qstr; + sub { # called by PSGI server + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qstr, $opt, \&mbox_qry_cb, $ctx, $q) and + $ctx->{-really_async} = 1; + }; } 1; diff --git a/lib/PublicInbox/MboxGz.pm b/lib/PublicInbox/MboxGz.pm index 533d2ff1..90e69c09 100644 --- a/lib/PublicInbox/MboxGz.pm +++ b/lib/PublicInbox/MboxGz.pm @@ -1,7 +1,7 @@ # Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> package PublicInbox::MboxGz; -use strict; +use v5.12; use parent 'PublicInbox::GzipFilter'; use PublicInbox::Eml; use PublicInbox::Hval qw/to_filename/; @@ -13,8 +13,8 @@ sub async_next ($) { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; eval { - $ctx->{smsg} = $ctx->{cb}->($ctx) or return $ctx->close; - $ctx->smsg_blob($ctx->{smsg}); + my $smsg = $ctx->{cb}->($ctx, $http) // return $ctx->close; + $smsg and $ctx->smsg_blob($ctx->{smsg} = $smsg); }; warn "E: $@" if $@; } diff --git a/lib/PublicInbox/MboxLock.pm b/lib/PublicInbox/MboxLock.pm index 9d7d4a32..5e373873 100644 --- a/lib/PublicInbox/MboxLock.pm +++ b/lib/PublicInbox/MboxLock.pm @@ -4,7 +4,7 @@ # Various mbox locking methods package PublicInbox::MboxLock; use v5.12; -use PublicInbox::OnDestroy; +use PublicInbox::OnDestroy (); use Fcntl qw(:flock F_SETLK F_SETLKW F_RDLCK F_WRLCK O_CREAT O_EXCL O_WRONLY SEEK_SET); use Carp qw(croak); @@ -122,10 +122,10 @@ sub acq { sub DESTROY { my ($self) = @_; my $f = $self->{".lock$$"} or return; - my $x; + my $od; if (my $dh = delete $self->{dh}) { opendir my $c, '.'; - $x = PublicInbox::OnDestroy->new(\&chdir, $c); + $od = PublicInbox::OnDestroy::all \&chdir, $c; chdir($dh); } CORE::unlink($f) or die "unlink($f): $! (lock stolen?)"; diff --git a/lib/PublicInbox/MultiGit.pm b/lib/PublicInbox/MultiGit.pm index b7691806..32bb3588 100644 --- a/lib/PublicInbox/MultiGit.pm +++ b/lib/PublicInbox/MultiGit.pm @@ -7,6 +7,7 @@ use strict; use v5.10.1; use PublicInbox::Spawn qw(run_die run_qx); use PublicInbox::Import; +use PublicInbox::Git qw(git_exe); use File::Temp 0.19; use List::Util qw(max); use PublicInbox::IO qw(read_all); @@ -110,11 +111,12 @@ sub epoch_cfg_set { my ($self, $epoch_nr) = @_; my $f = epoch_dir($self)."/$epoch_nr.git/config"; my $v = "../../$self->{all}/config"; + my @cmd = (git_exe, qw(config -f), $f, 'include.path'); if (-r $f) { - chomp(my $x = run_qx([qw(git config -f), $f, 'include.path'])); + chomp(my $x = run_qx(\@cmd)); return if $x eq $v; } - run_die([qw(git config -f), $f, 'include.path', $v ]); + run_die [@cmd, $v]; } sub add_epoch { diff --git a/lib/PublicInbox/OnDestroy.pm b/lib/PublicInbox/OnDestroy.pm index d9a6cd24..4301edff 100644 --- a/lib/PublicInbox/OnDestroy.pm +++ b/lib/PublicInbox/OnDestroy.pm @@ -3,22 +3,29 @@ package PublicInbox::OnDestroy; use v5.12; +use parent qw(Exporter); +use autodie qw(fork); +our @EXPORT = qw(on_destroy); +our $fork_gen = 0; -sub new { - shift; # ($class, $cb, @args) - bless [ @_ ], __PACKAGE__; +# either parent or child is expected to exit or exec shortly after this: +sub fork_tmp () { + my $pid = fork; + ++$fork_gen if $pid == 0; + $pid; } +# all children +sub all (@) { bless [ undef, @_ ], __PACKAGE__ } + +# same process +sub on_destroy (@) { bless [ $fork_gen, @_ ], __PACKAGE__ } + sub cancel { @{$_[0]} = () } sub DESTROY { - my ($cb, @args) = @{$_[0]}; - if (!ref($cb) && $cb) { - my $pid = $cb; - return if $pid != $$; - $cb = shift @args; - } - $cb->(@args) if $cb; + my ($fgen, $cb, @args) = @{$_[0]}; + $cb->(@args) if ($cb && ($fgen // $fork_gen) == $fork_gen); } 1; diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm index ab0f2fcc..eb0ed3c7 100644 --- a/lib/PublicInbox/RepoAtom.pm +++ b/lib/PublicInbox/RepoAtom.pm @@ -94,11 +94,10 @@ xmlns="http://www.w3.org/1999/xhtml"><pre style="white-space:pre-wrap"> sub srv_tags_atom { my ($ctx) = @_; my $max = 50; # TODO configurable - my @cmd = ('git', "--git-dir=$ctx->{git}->{git_dir}", - qw(for-each-ref --sort=-creatordate), "--count=$max", - '--perl', $EACH_REF_FMT, 'refs/tags'); + my $cmd = $ctx->{git}->cmd(qw(for-each-ref --sort=-creatordate), + "--count=$max", '--perl', $EACH_REF_FMT, 'refs/tags'); $ctx->{-feed_title} = "$ctx->{git}->{nick} tags"; - my $qsp = PublicInbox::Qspawn->new(\@cmd); + my $qsp = PublicInbox::Qspawn->new($cmd); $ctx->{-is_tag} = 1; $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx); } @@ -107,20 +106,19 @@ sub srv_atom { my ($ctx, $path) = @_; return if index($path, '//') >= 0 || index($path, '/') == 0; my $max = 50; # TODO configurable - my @cmd = ('git', "--git-dir=$ctx->{git}->{git_dir}", - qw(log --no-notes --no-color --no-abbrev), - $ATOM_FMT, "-$max"); + my $cmd = $ctx->{git}->cmd(qw(log --no-notes --no-color --no-abbrev), + $ATOM_FMT, "-$max"); my $tip = $ctx->{qp}->{h}; # same as cgit $ctx->{-feed_title} = $ctx->{git}->{nick}; $ctx->{-feed_title} .= " $path" if $path ne ''; if (defined($tip)) { - push @cmd, $tip; + push @$cmd, $tip; $ctx->{-feed_title} .= ", $tip"; } # else: let git decide based on HEAD if $tip isn't defined - push @cmd, '--'; - push @cmd, $path if $path ne ''; - my $qsp = PublicInbox::Qspawn->new(\@cmd, undef, + push @$cmd, '--'; + push @$cmd, $path if $path ne ''; + my $qsp = PublicInbox::Qspawn->new($cmd, undef, { quiet => 1, 2 => $ctx->{lh} }); $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx); } diff --git a/lib/PublicInbox/RepoSnapshot.pm b/lib/PublicInbox/RepoSnapshot.pm index 4c372569..bff97bc8 100644 --- a/lib/PublicInbox/RepoSnapshot.pm +++ b/lib/PublicInbox/RepoSnapshot.pm @@ -50,15 +50,13 @@ sub ver_check { # git->check_async callback delete($ctx->{env}->{'qspawn.wcb'})->(r(404)); } else { # found, done: $ctx->{etag} = $oid; - my @cfg; + my $cmd = $ctx->{git}->cmd; if (my $cmd = $FMT_CFG{$ctx->{snap_fmt}}) { - @cfg = ('-c', "tar.$ctx->{snap_fmt}.command=$cmd"); + push @$cmd, '-c', "tar.$ctx->{snap_fmt}.command=$cmd"; } - my $qsp = PublicInbox::Qspawn->new(['git', @cfg, - "--git-dir=$ctx->{git}->{git_dir}", 'archive', - "--prefix=$ctx->{snap_pfx}/", - "--format=$ctx->{snap_fmt}", $treeish], undef, - { quiet => 1 }); + push @$cmd, 'archive', "--prefix=$ctx->{snap_pfx}/", + "--format=$ctx->{snap_fmt}", $treeish; + my $qsp = PublicInbox::Qspawn->new($cmd, undef, { quiet => 1 }); $qsp->psgi_yield($ctx->{env}, undef, \&archive_hdr, $ctx); } } diff --git a/lib/PublicInbox/RepoTree.pm b/lib/PublicInbox/RepoTree.pm index 5c73531a..4c85f9a8 100644 --- a/lib/PublicInbox/RepoTree.pm +++ b/lib/PublicInbox/RepoTree.pm @@ -51,8 +51,8 @@ sub find_missing { $res->[0] = 404; return delete($ctx->{-wcb})->($res); } - my $cmd = ['git', "--git-dir=$ctx->{git}->{git_dir}", - qw(log --no-color -1), '--pretty=%H %h %s (%as)' ]; + my $cmd = $ctx->{git}->cmd(qw(log --no-color -1), + '--pretty=%H %h %s (%as)'); push @$cmd, $ctx->{qp}->{h} if defined($ctx->{qp}->{h}); push @$cmd, '--'; push @$cmd, $ctx->{-path}; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 678c8c5d..eb5e67ba 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -11,6 +11,7 @@ our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms); use List::Util qw(max); use POSIX qw(strftime); use Carp (); +our $XHC = 0; # defined but false # values for searching, changing the numeric value breaks # compatibility with old indices (so don't change them it) @@ -53,10 +54,13 @@ use constant { # # v1.6.0 adds BYTES, UID and THREADID values SCHEMA_VERSION => 15, + + # we may have up to 8 FDs per shard (depends on Xapian *shrug*) + SHARD_COST => 8, }; use PublicInbox::Smsg; -use PublicInbox::Over; +eval { require PublicInbox::Over }; our $QP_FLAGS; our %X = map { $_ => 0 } qw(BoolWeight Database Enquire QueryParser Stem Query); our $Xap; # 'Xapian' or 'Search::Xapian' @@ -85,14 +89,13 @@ our @XH_SPEC = ( 'k=i', # sort column (like sort(1)) 'm=i', # maximum number of results 'o=i', # offset - 'p', # show percent 'r', # 1=relevance then column 't', # collapse threads 'A=s@', # prefixes - 'D', # emit docdata 'K=i', # timeout kill after i seconds 'O=s', # eidx_key 'T=i', # threadid + 'Q=s@', # query prefixes "$user_prefix[:=]$XPREFIX" ); sub load_xapian () { @@ -429,6 +432,68 @@ sub mset { do_enquire($self, $qry, $opt, TS); } +sub xhc_start_maybe (@) { + require PublicInbox::XapClient; + my $xhc = PublicInbox::XapClient::start_helper(@_); + require PublicInbox::XhcMset if $xhc; + $xhc; +} + +sub xh_opt ($$) { + my ($self, $opt) = @_; + my $lim = $opt->{limit} || 50; + my @ret; + push @ret, '-o', $opt->{offset} if $opt->{offset}; + push @ret, '-m', $lim; + my $rel = $opt->{relevance} // 0; + if ($rel == -2) { # ORDER BY docid/UID (highest first) + push @ret, '-k', '-1'; + } elsif ($rel == -1) { # ORDER BY docid/UID (lowest first) + push @ret, '-k', '-1'; + push @ret, '-a'; + } elsif ($rel == 0) { + push @ret, '-k', $opt->{sort_col} // TS; + push @ret, '-a' if $opt->{asc}; + } else { # rel > 0 + push @ret, '-r'; + push @ret, '-k', $opt->{sort_col} // TS; + push @ret, '-a' if $opt->{asc}; + } + push @ret, '-t' if $opt->{threads}; + push @ret, '-T', $opt->{threadid} if defined $opt->{threadid}; + push @ret, '-O', $opt->{eidx_key} if defined $opt->{eidx_key}; + my $apfx = $self->{-alt_pfx} //= do { + my @tmp; + for (grep /\Aserial:/, @{$self->{altid} // []}) { + my (undef, $pfx) = split /:/, $_; + push @tmp, '-Q', "$pfx=X\U$pfx"; + } + # TODO: arbitrary header indexing goes here + \@tmp; + }; + (@ret, @$apfx); +} + +# returns a true value if actually handled asynchronously, +# and a falsy value if handled synchronously +sub async_mset { + my ($self, $qry_str, $opt, $cb, @args) = @_; + if ($XHC) { # unconditionally retrieving pct + rank for now + xdb($self); # populate {nshards} + my @margs = ($self->xh_args, xh_opt($self, $opt), '--'); + my $ret = eval { + my $rd = $XHC->mkreq(undef, 'mset', @margs, $qry_str); + PublicInbox::XhcMset->maybe_new($rd, $self, $cb, @args); + }; + $cb->(@args, undef, $@) if $@; + $ret; + } else { # synchronous + my $mset = $self->mset($qry_str, $opt); + $cb->(@args, $mset); + undef; + } +} + sub do_enquire { # shared with CodeSearch my ($self, $qry, $opt, $col) = @_; my $enq = $X{Enquire}->new(xdb($self)); @@ -578,7 +643,7 @@ EOM $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n} } } - # TODO: altid support + # altid support is handled in xh_opt and srch_init_extra in XH for my $name (sort keys %prob_prefix) { for (split(/ /, $prob_prefix{$name})) { $ret .= qq{\tqp->add_prefix("$name", "$_");\n} @@ -667,4 +732,17 @@ sub get_doc ($$) { } } +# not sure where best to put this... +sub ulimit_n () { + my $n; + if (eval { require BSD::Resource; 1 }) { + my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); + ($n, undef) = BSD::Resource::getrlimit($NOFILE); + } else { + require PublicInbox::Spawn; + $n = PublicInbox::Spawn::run_qx([qw(/bin/sh -c), 'ulimit -n']); + } + $n; +} + 1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 1cbf6d23..4fd493d9 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -119,7 +119,7 @@ sub load_xapian_writable () { my $ver = eval 'v'.join('.', eval($xap.'::major_version()'), eval($xap.'::minor_version()'), eval($xap.'::revision()')); - if ($ver ge 1.4) { # new flags in Xapian 1.4 + if ($ver ge v1.4) { # new flags in Xapian 1.4 $DB_NO_SYNC = 0x4; $DB_DANGEROUS = 0x10; } @@ -1003,8 +1003,7 @@ sub prepare_stack ($$) { sub is_ancestor ($$$) { my ($git, $cur, $tip) = @_; return 0 unless $git->check($cur); - my $cmd = [ 'git', "--git-dir=$git->{git_dir}", - qw(merge-base --is-ancestor), $cur, $tip ]; + my $cmd = $git->cmd(qw(merge-base --is-ancestor), $cur, $tip); run_wait($cmd) == 0; } diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 1630eb4a..ea261bda 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -45,7 +45,7 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; # caller (ipc_worker_spawn) must capture this: - PublicInbox::OnDestroy->new($$, \&_worker_done, $self); + on_destroy \&_worker_done, $self; } sub index_eml { diff --git a/lib/PublicInbox/SearchThread.pm b/lib/PublicInbox/SearchThread.pm index 00ae9fac..672c53ad 100644 --- a/lib/PublicInbox/SearchThread.pm +++ b/lib/PublicInbox/SearchThread.pm @@ -33,19 +33,24 @@ sub thread { # can be shakier if somebody used In-Reply-To with multiple, disparate # messages. So, take the client Date: into account since we can't # always determine ordering when somebody uses multiple In-Reply-To. + my (%dedupe, $mid); my @kids = sort { $a->{ds} <=> $b->{ds} } grep { # this delete saves around 4K across 1K messages # TODO: move this to a more appropriate place, breaks tests # if we do it during psgi_cull delete $_->{num}; bless $_, 'PublicInbox::SearchThread::Msg'; - if (exists $id_table{$_->{mid}}) { + $mid = $_->{mid}; + if (exists $id_table{$mid}) { $_->{children} = []; push @imposters, $_; # we'll deal with them later undef; } else { $_->{children} = {}; # will become arrayref later - $id_table{$_->{mid}} = $_; + %dedupe = ($mid => undef); + ($mid) = keys %dedupe; + $_->{mid} = $mid; + $id_table{$mid} = $_; defined($_->{references}); } } @$msgs; diff --git a/lib/PublicInbox/SearchView.pm b/lib/PublicInbox/SearchView.pm index 2d3e942c..f056dddf 100644 --- a/lib/PublicInbox/SearchView.pm +++ b/lib/PublicInbox/SearchView.pm @@ -30,10 +30,9 @@ sub mbox_results { sub sres_top_html { my ($ctx) = @_; - my $srch = $ctx->{ibx}->isrch or + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); my $q = PublicInbox::SearchQuery->new($ctx->{qp}); - my $x = $q->{x}; my $o = $q->{o} // 0; my $asc; if ($o < 0) { @@ -41,48 +40,57 @@ sub sres_top_html { $o = -($o + 1); # so [-1] is the last element, like Perl lists } - my $code = 200; # double the limit for expanded views: - my $opts = { + my $opt = { limit => $q->{l}, offset => $o, relevance => $q->{r}, threads => $q->{t}, asc => $asc, }; - my ($mset, $total, $err, $html); -retry: - eval { - my $query = $q->{'q'}; - $srch->query_approxidate($ctx->{ibx}->git, $query); - $mset = $srch->mset($query, $opts); - $total = $mset->get_matches_estimated; - }; - $err = $@; + my $qs = $q->{'q'}; + $srch->query_approxidate($ctx->{ibx}->git, $qs); + sub { + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qs, $opt, \&sres_html_cb, $ctx, $opt, $q); + } +} + +sub sres_html_cb { # async_mset cb + my ($ctx, $opt, $q, $mset, $err) = @_; + my $code = 200; + my $total = $mset ? $mset->get_matches_estimated : undef; ctx_prepare($q, $ctx); + my ($res, $html); if ($err) { $code = 400; $html = '<pre>'.err_txt($ctx, $err).'</pre><hr>'; } elsif ($total == 0) { - if (defined($ctx->{-uxs_retried})) { - # undo retry damage: + if (defined($ctx->{-uxs_retried})) { # undo retry damage: $q->{'q'} = $ctx->{-uxs_retried}; - } elsif (index($q->{'q'}, '%') >= 0) { + } elsif (index($q->{'q'}, '%') >= 0) { # retry unescaped $ctx->{-uxs_retried} = $q->{'q'}; - $q->{'q'} = uri_unescape($q->{'q'}); - goto retry; + my $qs = $q->{'q'} = uri_unescape($q->{'q'}); + $ctx->{srch}->query_approxidate($ctx->{ibx}->git, $qs); + return $ctx->{srch}->async_mset($qs, $opt, + \&sres_html_cb, $ctx, $opt, $q); } $code = 404; $html = "<pre>\n[No results found]</pre><hr>"; + } elsif ($q->{x} eq 'A') { + $res = adump($mset, $q, $ctx); } else { - return adump($_[0], $mset, $q, $ctx) if $x eq 'A'; - $ctx->{-html_tip} = search_nav_top($mset, $q, $ctx); - return mset_thread($ctx, $mset, $q) if $x eq 't'; - mset_summary($ctx, $mset, $q); # appends to {-html_tip} - $html = ''; + if ($q->{x} eq 't') { + $res = mset_thread($ctx, $mset, $q); + } else { + mset_summary($ctx, $mset, $q); # appends to {-html_tip} + $html = ''; + } } - html_oneshot($ctx, $code, $html); + $res //= html_oneshot($ctx, $code, $html); + my $wcb = delete $ctx->{wcb}; + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } # display non-nested search results similar to what users expect from @@ -146,9 +154,15 @@ sub path2inc ($) { if (my $short = $rmap_inc{$full}) { return $short; } elsif (!scalar(keys %rmap_inc) && -e $full) { - %rmap_inc = map {; "$INC{$_}" => $_ } keys %INC; + # n.b. $INC{'PublicInbox::Gcf2'} is undef if libgit2-dev + # doesn't exist + my $f; + %rmap_inc = map {; + $f = $INC{$_}; + defined $f ? ($f, $_) : (); + } keys %INC; # fall back to basename as last resort - $rmap_inc{$full} // (split('/', $full))[-1]; + $rmap_inc{$full} // (split(m'/', $full))[-1]; } else { $full; } @@ -302,13 +316,12 @@ sub mset_thread { my $rootset = PublicInbox::SearchThread::thread($msgs, $r ? \&sort_relevance : \&PublicInbox::View::sort_ds, $ctx); - my $skel = search_nav_bot($ctx, $mset, $q).'<pre>'. <<EOM; + $ctx->{skel} = [ search_nav_bot($ctx, $mset, $q).'<pre>'. <<EOM ]; -- pct% links below jump to the message on this page, permalinks otherwise -- EOM $ctx->{-upfx} = ''; $ctx->{anchor_idx} = 1; $ctx->{cur_level} = 0; - $ctx->{skel} = \$skel; $ctx->{mapping} = {}; $ctx->{searchview} = 1; $ctx->{prev_attr} = ''; @@ -318,7 +331,7 @@ EOM # reduce hash lookups in skel_dump $ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef; PublicInbox::View::walk_thread($rootset, $ctx, - \&PublicInbox::View::pre_thread); + \&PublicInbox::View::pre_thread); # pushes to ctx->{skel} # link $INBOX_DIR/description text to "recent" view around # the newest message in this result set: @@ -335,7 +348,7 @@ sub mset_thread_i { print { $ctx->zfh } $ctx->html_top if exists $ctx->{-html_tip}; $eml and return PublicInbox::View::eml_entry($ctx, $eml); my $smsg = shift @{$ctx->{msgs}} or - print { $ctx->zfh } ${delete($ctx->{skel})}; + print { $ctx->zfh } @{delete($ctx->{skel})}; $smsg; } @@ -357,7 +370,7 @@ sub ctx_prepare { } sub adump { - my ($cb, $mset, $q, $ctx) = @_; + my ($mset, $q, $ctx) = @_; $ctx->{ids} = $ctx->{ibx}->isrch->mset_to_artnums($mset); $ctx->{search_query} = $q; # used by WwwAtomStream::atom_header PublicInbox::WwwAtomStream->response($ctx, \&adump_i); diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm index 296e7d17..898ca72d 100644 --- a/lib/PublicInbox/SolverGit.pm +++ b/lib/PublicInbox/SolverGit.pm @@ -13,7 +13,7 @@ use v5.10.1; use File::Temp 0.19 (); # 0.19 for ->newdir use autodie qw(mkdir); use Fcntl qw(SEEK_SET); -use PublicInbox::Git qw(git_unquote git_quote); +use PublicInbox::Git qw(git_unquote git_quote git_exe); use PublicInbox::IO qw(write_file); use PublicInbox::MsgIter qw(msg_part_text); use PublicInbox::Qspawn; @@ -136,6 +136,12 @@ sub extract_diff ($$) { if ($cte =~ /\bquoted-printable\b/i && $part->crlf eq "\n") { $s =~ s/\r\n/\n/sg; } + + # Quiet "Complex regular subexpression recursion limit" warning. + # Not much we can do about it, but it's no longer relevant to + # Perl 5.3x (the warning was removed in 5.37.1, and actual + # recursino sometime before then). + no warnings 'regexp'; $s =~ m!( # $1 start header lines we save for debugging: # everything before ^index is optional, but we don't @@ -170,7 +176,7 @@ sub extract_diff ($$) { (?:^---\x20$FN$LF) # "+++ b/foo.c" sets post-filename ($11) in case - # $3 is missing + # $3 is missing or truncated (?:^\+{3}\x20$FN$LF) # the meat of the diff, including "^\\No newline ..." @@ -187,7 +193,8 @@ sub extract_diff ($$) { mode_a => $5 // $8 // $4, # new (file) // unchanged // old }; my $path_a = $2 // $10; - my $path_b = $3 // $11; + my $path_b = defined $11 && defined $3 && length $11 > length $3 ? + $11 // $3 : $3 // $11; my $patch = $9; # don't care for leading 'a/' and 'b/' @@ -287,7 +294,7 @@ sub prepare_index ($) { dbg($self, 'preparing index'); my $rdr = { 0 => $in }; - my $cmd = [ qw(git update-index -z --index-info) ]; + my $cmd = [ git_exe, qw(update-index -z --index-info) ]; my $qsp = PublicInbox::Qspawn->new($cmd, $self->{git_env}, $rdr); $path_a = git_quote($path_a); $self->{-msg} = "index prepared:\n$mode_a $oid_full\t$path_a"; @@ -467,7 +474,7 @@ sub apply_result ($$) { # qx_cb skip_identical($self, $patches, $di->{oid_b}); } - my @cmd = qw(git ls-files -s -z); + my @cmd = (git_exe, qw(ls-files -s -z)); my $qsp = PublicInbox::Qspawn->new(\@cmd, $self->{git_env}); $self->{-cur_di} = $di; qsp_qx $self, $qsp, \&ls_files_result; @@ -478,7 +485,7 @@ sub do_git_apply ($) { my $patches = $self->{patches}; # we need --ignore-whitespace because some patches are CRLF - my @cmd = (qw(git apply --cached --ignore-whitespace + my @cmd = (git_exe, qw(apply --cached --ignore-whitespace --unidiff-zero --whitespace=warn --verbose)); my $len = length(join(' ', @cmd)); my $di; # keep track of the last one for "git ls-files" diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index e36659ce..e9e81e88 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -176,15 +176,15 @@ out: return (int)pid; } -static int sendmsg_retry(unsigned *tries) +static int sendmsg_retry(int *tries) { const struct timespec req = { 0, 100000000 }; /* 100ms */ int err = errno; switch (err) { case EINTR: PERL_ASYNC_CHECK(); return 1; case ENOBUFS: case ENOMEM: case ETOOMANYREFS: - if (++*tries >= 50) return 0; - fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n", + if (--*tries < 0) return 0; + fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n", strerror(err), *tries); nanosleep(&req, NULL); PERL_ASYNC_CHECK(); @@ -201,7 +201,7 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) +SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries) { struct msghdr msg = { 0 }; union my_cmsg cmsg = { 0 }; @@ -211,7 +211,6 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) AV *fds = (AV *)SvRV(svfds); I32 i, nfds = av_len(fds) + 1; int *fdp; - unsigned tries = 0; if (SvOK(data)) { iov.iov_base = SvPV(data, dlen); @@ -332,6 +331,9 @@ EOM if (defined $all_libc) { # set for Gcf2 $ENV{PERL_INLINE_DIRECTORY} = $inline_dir; %RLIMITS = rlimit_map(); + *send_cmd4 = sub ($$$$;$) { + send_cmd4_($_[0], $_[1], $_[2], $_[3], 50); + } } else { require PublicInbox::SpawnPP; *pi_fork_exec = \&PublicInbox::SpawnPP::pi_fork_exec diff --git a/lib/PublicInbox/SpawnPP.pm b/lib/PublicInbox/SpawnPP.pm index f89d37d4..9ad4d0a1 100644 --- a/lib/PublicInbox/SpawnPP.pm +++ b/lib/PublicInbox/SpawnPP.pm @@ -7,7 +7,8 @@ package PublicInbox::SpawnPP; use v5.12; use POSIX qw(dup2 _exit setpgid :signal_h); -use autodie qw(chdir close fork pipe); +use autodie qw(chdir close pipe); +use PublicInbox::OnDestroy; # this is loaded by PublicInbox::Spawn, so we can't use/require it, here # Pure Perl implementation for folks that do not use Inline::C @@ -22,7 +23,7 @@ sub pi_fork_exec ($$$$$$$) { } sigprocmask(SIG_SETMASK, $set, $old) or die "SIG_SETMASK(set): $!"; pipe(my $r, my $w); - my $pid = fork; + my $pid = PublicInbox::OnDestroy::fork_tmp; if ($pid == 0) { close $r; $SIG{__DIE__} = sub { diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 829cfa3c..4cbe9623 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -317,6 +317,10 @@ BEGIN { ) } $PACK{CMSG_ALIGN_size} = SIZEOF_size_t; + $PACK{SIZEOF_cmsghdr} //= 0; + $PACK{TMPL_cmsg_len} //= undef; + $PACK{CMSG_DATA_off} //= undef; + $PACK{TMPL_msghdr} //= undef; } # SFD_CLOEXEC is arch-dependent, so IN_CLOEXEC may be, too @@ -463,8 +467,8 @@ if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) { no warnings 'once'; require PublicInbox::CmdIPC4; -*send_cmd4 = sub ($$$$) { - my ($sock, $fds, undef, $flags) = @_; +*send_cmd4 = sub ($$$$;$) { + my ($sock, $fds, undef, $flags, $tries) = @_; my $iov = pack('P'.TMPL_size_t, $_[2] // NUL, length($_[2] // NUL) || 1); my $fd_space = scalar(@$fds) * SIZEOF_int; @@ -483,10 +487,10 @@ require PublicInbox::CmdIPC4; $msg_controllen, 0); # msg_flags my $s; - my $try = 0; + $tries //= 50; do { $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags); - } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try)); + } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($tries)); $s >= 0 ? $s : undef; }; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index 5f159683..00e96aee 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -17,9 +17,10 @@ my $lei_loud = $ENV{TEST_LEI_ERR_LOUD}; our $tail_cmd = $ENV{TAIL}; our ($lei_opt, $lei_out, $lei_err); use autodie qw(chdir close fcntl mkdir open opendir seek unlink); +$ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds $_ = File::Spec->rel2abs($_) for (grep(!m!^/!, @INC)); - +our $CURRENT_DAEMON; BEGIN { @EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods run_script start_script key2sub xsys xsys_e xqx eml_load tick @@ -166,11 +167,8 @@ sub require_git ($;$) { sub require_git_http_backend (;$) { my ($nr) = @_; state $ok = do { - require PublicInbox::Git; - my $git = PublicInbox::Git::check_git_exe() or plan - skip_all => 'nothing in public-inbox works w/o git'; my $rdr = { 1 => \my $out, 2 => \my $err }; - xsys([$git, qw(http-backend)], undef, $rdr); + xsys([qw(git http-backend)], undef, $rdr); $out =~ /^Status:/ism; }; if (!$ok) { @@ -273,15 +271,16 @@ sub require_mods { sub key2script ($) { my ($key) = @_; - return $key if ($key eq 'git' || index($key, '/') >= 0); + require PublicInbox::Git; + return PublicInbox::Git::git_exe() if $key eq 'git'; + return $key if index($key, '/') >= 0; # n.b. we may have scripts which don't start with "public-inbox" in # the future: $key =~ s/\A([-\.])/public-inbox$1/; 'blib/script/'.$key; } -my @io_mode = ([ *STDIN{IO}, '+<&' ], [ *STDOUT{IO}, '+>&' ], - [ *STDERR{IO}, '+>&' ]); +my @io_mode = ([ \*STDIN, '+<&' ], [ \*STDOUT, '+>&' ], [ \*STDERR, '+>&' ]); sub _prepare_redirects ($) { my ($fhref) = @_; @@ -565,6 +564,9 @@ sub start_script { my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 2; my $sub = $run_mode == 0 ? undef : key2sub($key); my $tail; + my @xh = split(/\s+/, $ENV{TEST_DAEMON_XH} // ''); + @xh = () if $key !~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/; + push @argv, @xh; if ($tail_cmd) { my @paths; for (@argv) { @@ -588,9 +590,9 @@ sub start_script { require PublicInbox::DS; my $oset = PublicInbox::DS::block_signals(); require PublicInbox::OnDestroy; - my $tmp_mask = PublicInbox::OnDestroy->new( + my $tmp_mask = PublicInbox::OnDestroy::all( \&PublicInbox::DS::sig_setmask, $oset); - my $pid = PublicInbox::DS::do_fork(); + my $pid = PublicInbox::DS::fork_persist(); if ($pid == 0) { close($_) for (@{delete($opt->{-CLOFORK}) // []}); # pretend to be systemd (cf. sd_listen_fds(3)) @@ -612,7 +614,7 @@ sub start_script { $ENV{LISTEN_FDS} = $fds; } if ($opt->{-C}) { chdir($opt->{-C}) } - $0 = join(' ', @$cmd); + $0 = join(' ', @$cmd, @xh); local @SIG{keys %SIG} = map { undef } values %SIG; local $SIG{FPE} = 'IGNORE'; # Perl default undef $tmp_mask; @@ -720,7 +722,10 @@ SKIP: { require PublicInbox::Spawn; require PublicInbox::Config; require File::Path; - + eval { # use XDG_CACHE_HOME, first: + require PublicInbox::XapHelperCxx; + PublicInbox::XapHelperCxx::check_build(); + }; local %ENV = %ENV; delete $ENV{XDG_DATA_HOME}; delete $ENV{XDG_CONFIG_HOME}; @@ -945,6 +950,7 @@ sub test_httpd ($$;$$) { local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p"; my $ua = LWP::UserAgent->new; $ua->max_redirect(0); + local $CURRENT_DAEMON = $td; Plack::Test::ExternalServer::test_psgi(client => $client, ua => $ua); $cb->() if $cb; diff --git a/lib/PublicInbox/Umask.pm b/lib/PublicInbox/Umask.pm index 00772ce5..2c859e65 100644 --- a/lib/PublicInbox/Umask.pm +++ b/lib/PublicInbox/Umask.pm @@ -58,7 +58,7 @@ sub _umask_for { sub with_umask { my ($self, $cb, @arg) = @_; my $old = umask($self->{umask} //= umask_prepare($self)); - my $restore = PublicInbox::OnDestroy->new($$, \&CORE::umask, $old); + my $restore = on_destroy \&CORE::umask, $old; $cb ? $cb->(@arg) : $restore; } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index fb259396..15a73158 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -507,13 +507,7 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx sub checkpoint ($;$) { my ($self, $wait) = @_; - if (my $im = $self->{im}) { - if ($wait) { - $im->barrier; - } else { - $im->checkpoint; - } - } + $self->{im}->barrier if $self->{im}; my $shards = $self->{idx_shards}; if ($shards) { my $dbh = $self->{mm}->{dbh} if $self->{mm}; @@ -1077,8 +1071,8 @@ sub unindex_todo ($$$) { return if $before == $after; # ensure any blob can not longer be accessed via dumb HTTP - run_die(['git', "--git-dir=$unit->{git}->{git_dir}", - qw(-c gc.reflogExpire=now gc --prune=all --quiet)]); + run_die($unit->{git}->cmd(qw(-c gc.reflogExpire=now gc + --prune=all --quiet))); } sub sync_ranges ($$) { diff --git a/lib/PublicInbox/View.pm b/lib/PublicInbox/View.pm index 44e1f2a8..9a90f939 100644 --- a/lib/PublicInbox/View.pm +++ b/lib/PublicInbox/View.pm @@ -74,9 +74,13 @@ sub msg_page { my ($id, $prev); my $next_arg = $ctx->{next_arg} = [ $ctx->{mid}, \$id, \$prev ]; - my $smsg = $ctx->{smsg} = $over->next_by_mid(@$next_arg) or - return; # undef == 404 - + my $smsg = $ctx->{smsg} = $over->next_by_mid(@$next_arg); + if (!$smsg && $ctx->{mid} =~ /\A\<(.+)\>\z/ and + ($next_arg->[0] = $1) and + ($over->next_by_mid(@$next_arg))) { + return PublicInbox::WWW::r301($ctx, undef, $next_arg->[0]); + } + $smsg or return; # undef=404 # allow user to easily browse the range around this message if # they have ->over $ctx->{-t_max} = $smsg->{ts}; @@ -307,7 +311,8 @@ sub eml_entry { " <a\nhref=\"${mhref}raw\">raw</a>" . " <a\nhref=\"${mhref}#R\">reply</a>"; - delete($ctx->{-qry}) and + # points to permalink + delete($ctx->{-qry_dfblob}) and $rv .= qq[ <a\nhref="${mhref}#related">related</a>]; my $hr; @@ -432,6 +437,7 @@ sub walk_thread ($$$) { sub pre_thread { # walk_thread callback my ($ctx, $level, $node, $idx) = @_; + # node->{mid} is deduplicated in PublicInbox::SearchThread::thread $ctx->{mapping}->{$node->{mid}} = [ '', $node, $idx, $level ]; skel_dump($ctx, $level, $node); } @@ -476,7 +482,7 @@ sub stream_thread_i { # PublicInbox::WwwStream::getline callback print { $ctx->zfh } ghost_index_entry($ctx, $lvl, $smsg) } else { # all done print { $ctx->zfh } thread_adj_level($ctx, 0), - ${delete($ctx->{skel})}; + @{delete($ctx->{skel})}; return; } } @@ -513,11 +519,13 @@ href="../../">newest</a>] EOF $skel .= "<b\nid=t>Thread overview:</b> "; $skel .= $nr == 1 ? '(only message)' : "$nr+ messages"; - $skel .= " (download: <a\nhref=\"../t.mbox.gz\">mbox.gz</a>"; - $skel .= " / follow: <a\nhref=\"../t.atom\">Atom feed</a>)\n"; - $skel .= "-- links below jump to the message on this page --\n"; + $skel .= <<EOM; + (download: <a\nhref="../t.mbox.gz">mbox.gz</a> follow: <a +href=\"../t.atom\">Atom feed</a> +-- links below jump to the message on this page -- +EOM $ctx->{cur_level} = 0; - $ctx->{skel} = \$skel; + $ctx->{skel} = [ $skel ]; $ctx->{prev_attr} = ''; $ctx->{prev_level} = 0; $ctx->{root_anchor} = 'm' . id_compress($mid, 1); @@ -529,9 +537,9 @@ EOF # reduce hash lookups in pre_thread->skel_dump $ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef; - walk_thread($rootset, $ctx, \&pre_thread); + walk_thread($rootset, $ctx, \&pre_thread); # pushes to ctx->{skel} - $skel .= '</pre>'; + push @{$ctx->{skel}}, '</pre>'; return stream_thread($rootset, $ctx) unless $ctx->{flat}; # flat display: lazy load the full message from smsg @@ -553,8 +561,7 @@ sub thread_html_i { # PublicInbox::WwwStream::getline callback while (my $smsg = shift @{$ctx->{msgs}}) { return $smsg if exists($smsg->{blob}); } - my $skel = delete($ctx->{skel}) or return; # all done - print { $ctx->zfh } $$skel; + print { $ctx->zfh } @{delete $ctx->{skel} // []}; undef; } } @@ -698,6 +705,7 @@ href="d/">diff</a>)</pre><pre>]; } } my @subj = $eml->header('Subject'); + $ctx->{subj_raw} = $subj[0]; $hbuf .= "Subject: $_\n" for @subj; $title[0] = $subj[0] // '(no subject)'; $hbuf .= "Date: $_\n" for $eml->header('Date'); @@ -778,13 +786,13 @@ sub thread_skel ($$$) { my $ibx = $ctx->{ibx}; my ($nr, $msgs) = $ibx->over->get_thread($mid); my $parent = in_reply_to($hdr); - $$skel .= "\n<b>Thread overview: </b>"; + $skel->[-1] .= "\n<b>Thread overview: </b>"; if ($nr <= 1) { if (defined $parent) { - $$skel .= SKEL_EXPAND."\n "; - $$skel .= ghost_parent('../', $parent) . "\n"; + $skel->[-1] .= SKEL_EXPAND."\n "; + $skel->[-1] .= ghost_parent('../', $parent) . "\n"; } else { - $$skel .= "<a\nid=r>[no followups]</a> ". + $skel->[-1] .= "<a\nid=r>[no followups]</a> ". SKEL_EXPAND."\n"; } $ctx->{next_msg} = undef; @@ -792,8 +800,9 @@ sub thread_skel ($$$) { return; } - $$skel .= $nr; - $$skel .= '+ messages / '.SKEL_EXPAND.qq! <a\nhref="#b">top</a>\n!; + $skel->[-1] .= $nr; + $skel->[-1] .= '+ messages / '.SKEL_EXPAND. + qq! <a\nhref="#b">top</a>\n!; # nb: mutt only shows the first Subject in the index pane # when multiple Subject: headers are present, so we follow suit: @@ -811,25 +820,38 @@ sub thread_skel ($$$) { $ctx->{parent_msg} = $parent; } +sub dfqry_text ($$) { + my ($ctx, $subj) = @_; + my $qry_dfblob = delete $ctx->{-qry_dfblob} or return (undef); + my @bs = split /["\x{201c}\x{201d}]+/, $subj; + my $q = join ' ', (@bs ? ('(') : ()), map { + chop if length > 7; # include 1 abbrev "older" patches + "dfblob:$_"; + } @$qry_dfblob; + local $Text::Wrap::columns = COLS; + local $Text::Wrap::huge = 'overflow'; + $subj //= ''; + $subj =~ s/\A\s*(?:amend|fixup|squash)\!\s*//; # --autosquash commands + # split on double-quotes for phrases + $q = wrap('', '', $q); + if (@bs) { + $q .= " )\n OR ("; + $q .= qq[\nbs:"$_"] for @bs; + $q .= ' )'; + } + my $rows = ($q =~ tr/\n/\n/) + 1; + ($rows, ascii_html($q)); +} + # writes to zbuf sub html_footer { my ($ctx, $hdr) = @_; my $upfx = '../'; - my (@related, $skel); + my (@related, @skel); my $foot = '<pre>'; - my $qry = delete $ctx->{-qry}; - if ($qry && $ctx->{ibx}->isrch) { - my $q = ''; # search for either ancestor or descendent patches - for (@{$qry->{dfpre}}, @{$qry->{dfpost}}) { - chop if length > 7; # include 1 abbrev "older" patches - $q .= "dfblob:$_ "; - } - chop $q; # omit trailing SP - local $Text::Wrap::columns = COLS; - local $Text::Wrap::huge = 'overflow'; - $q = wrap('', '', $q); - my $rows = ($q =~ tr/\n/\n/) + 1; - $q = ascii_html($q); + my ($rows, $q) = dfqry_text $ctx, + delete($ctx->{-qry_subj}) // $ctx->{subj_raw}; + if ($rows && $ctx->{ibx}->isrch) { $related[0] = <<EOM; <form id=related action=$upfx @@ -847,12 +869,12 @@ EOM my $t = ts2str($ctx->{-t_max}); my $t_fmt = fmt_ts($ctx->{-t_max}); my $fallback = @related ? "\t" : "<a id=related>\t</a>"; - $skel = <<EOF; + $skel[0] = <<EOF; ${fallback}other threads:[<a href="$upfx?t=$t">~$t_fmt UTC</a>|<a href="$upfx">newest</a>] EOF - thread_skel(\$skel, $ctx, $hdr); + thread_skel(\@skel, $ctx, $hdr); my ($next, $prev); my $parent = ' '; $next = $prev = ' '; @@ -879,11 +901,11 @@ EOF } $foot .= "$next $prev$parent "; } else { # unindexed inboxes w/o over - $skel = qq( <a\nhref="$upfx">latest</a>); + $skel[0] = qq( <a\nhref="$upfx">latest</a>); } - # $skel may be big for big threads, don't append it to $foot + # @skel may be big for big threads, don't push to it print { $ctx->zfh } $foot, qq(<a\nhref="#R">reply</a>), - $skel, '</pre>', @related, + @skel, '</pre>', @related, msg_reply($ctx, $hdr); } @@ -985,7 +1007,8 @@ sub skel_dump { # walk_thread callback my $mid = $smsg->{mid}; if ($level == 0 && $ctx->{skel_dump_roots}++) { - $$skel .= delete($ctx->{sl_note}) || ''; + my $note = delete $ctx->{sl_note}; + push @$skel, $note if $note; } my $f = ascii_html(delete $smsg->{from_name}); @@ -1014,7 +1037,7 @@ sub skel_dump { # walk_thread callback if ($cur) { if ($cur eq $mid) { delete $ctx->{cur}; - $$skel .= "<b>$d<a\nid=r\nhref=\"#t\">". + push @$skel, "<b>$d<a\nid=r\nhref=\"#t\">". "$attr [this message]</a></b>\n"; return 1; } else { @@ -1054,8 +1077,7 @@ sub skel_dump { # walk_thread callback } else { $m = $ctx->{-upfx}.mid_href($mid).'/'; } - $$skel .= $d . "<a\nhref=\"$m\"$id>" . $end; - 1; + push @$skel, qq($d<a\nhref="$m"$id>$end); } sub _skel_ghost { @@ -1078,8 +1100,7 @@ sub _skel_ghost { } else { $d .= qq{<<a\nhref="$href">$html</a>>\n}; } - ${$ctx->{skel}} .= $d; - 1; + push @{$ctx->{skel}}, $d; } # note: we favor Date: here because git-send-email increments it diff --git a/lib/PublicInbox/ViewDiff.pm b/lib/PublicInbox/ViewDiff.pm index d078c5f9..7a6d9a2b 100644 --- a/lib/PublicInbox/ViewDiff.pm +++ b/lib/PublicInbox/ViewDiff.pm @@ -135,15 +135,14 @@ sub diff_header ($$$) { # no need to capture oid_a and oid_b on add/delete, # we just linkify OIDs directly via s///e in conditional if ($$x =~ s/$NULL_TO_BLOB/$1 . oid($dctx, $spfx, $2)/e) { - push @{$ctx->{-qry}->{dfpost}}, $2; + push @{$ctx->{-qry_dfblob}}, $2; } elsif ($$x =~ s/$BLOB_TO_NULL/'index '.oid($dctx, $spfx, $1).$2/e) { - push @{$ctx->{-qry}->{dfpre}}, $1; + push @{$ctx->{-qry_dfblob}}, $1; } elsif ($$x =~ $BLOB_TO_BLOB) { # modification-only, not add/delete: # linkify hunk headers later using oid_a and oid_b @$dctx{qw(oid_a oid_b)} = ($1, $2); - push @{$ctx->{-qry}->{dfpre}}, $1; - push @{$ctx->{-qry}->{dfpost}}, $2; + push @{$ctx->{-qry_dfblob}}, $1, $2; } else { warn "BUG? <$$x> had no ^index line"; } @@ -183,9 +182,10 @@ sub diff_before_or_after ($$) { sub flush_diff ($$) { my ($ctx, $cur) = @_; + my ($subj) = ($$cur =~ /^Subject:\s*\[[^\]]+\]\s*(.+?)$/sm); my @top = split($EXTRACT_DIFFS, $$cur); undef $$cur; # free memory - + $ctx->{-qry_subj} = $subj if $subj; my $lnk = $ctx->{-linkify}; my $dctx; # {}, keys: Q, oid_a, oid_b my $zfh = $ctx->zfh; diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm index 790b9a2c..b69214bc 100644 --- a/lib/PublicInbox/ViewVCS.pm +++ b/lib/PublicInbox/ViewVCS.pm @@ -25,6 +25,7 @@ use PublicInbox::Tmpfile; use PublicInbox::ViewDiff qw(flush_diff uri_escape_path); use PublicInbox::View; use PublicInbox::Eml; +use PublicInbox::OnDestroy; use Text::Wrap qw(wrap); use PublicInbox::Hval qw(ascii_html to_filename prurl utf8_maybe); use POSIX qw(strftime); @@ -105,7 +106,7 @@ sub stream_large_blob ($$) { my ($ctx, $res) = @_; $ctx->{-res} = $res; my ($git, $oid, $type, $size, $di) = @$res; - my $cmd = ['git', "--git-dir=$git->{git_dir}", 'cat-file', $type, $oid]; + my $cmd = $git->cmd('cat-file', $type, $oid); my $qsp = PublicInbox::Qspawn->new($cmd); $ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb}; $qsp->psgi_yield($ctx->{env}, undef, \&stream_blob_parse_hdr, $ctx); @@ -250,7 +251,7 @@ href="$ibx_url?t=$t" title="list contemporary emails">$2</a>) !e; - $ctx->{-title_html} = $s = $ctx->{-linkify}->to_html($s); + my $title_html = $ctx->{-title_html} = $ctx->{-linkify}->to_html($s); my ($P, $p, $pt) = delete @$ctx{qw(-cmt_P -cmt_p -cmt_pt)}; $_ = qq(<a href="$upfx$_/s/">).shift(@$p).'</a> '.shift(@$pt) for @$P; if (@$P == 1) { @@ -272,7 +273,7 @@ href="$f.patch">patch</a>)\n <a href=#parent>parent</a> $P->[0]}; author $au committer $co -<b>$s</b> +<b>$title_html</b> EOM print $zfh "\n", $ctx->{-linkify}->to_html($bdy) if length($bdy); undef $bdy; # free memory @@ -290,20 +291,8 @@ EOM # TODO: should there be another textarea which attempts to # search for the exact email which was applied to make this # commit? - if (my $qry = delete $ctx->{-qry}) { - my $q = ''; - for (@{$qry->{dfpost}}, @{$qry->{dfpre}}) { - # keep blobs as short as reasonable, emails - # are going to be older than what's in git - substr($_, 7, 64, ''); - $q .= "dfblob:$_ "; - } - chop $q; # no trailing SP - local $Text::Wrap::columns = PublicInbox::View::COLS; - local $Text::Wrap::huge = 'overflow'; - $q = wrap('', '', $q); - my $rows = ($q =~ tr/\n/\n/) + 1; - $q = ascii_html($q); + my ($rows, $q) = PublicInbox::View::dfqry_text $ctx, $s; + if ($rows) { my $ibx_url = ibx_url_for($ctx); my $alt; if (defined $ibx_url) { @@ -367,10 +356,9 @@ sub stream_patch_parse_hdr { # {parse_hdr} for Qspawn sub show_patch ($$) { my ($ctx, $res) = @_; my ($git, $oid) = @$res; - my @cmd = ('git', "--git-dir=$git->{git_dir}", - qw(format-patch -1 --stdout -C), + my $cmd = $git->cmd(qw(format-patch -1 --stdout -C), "--signature=git format-patch -1 --stdout -C $oid", $oid); - my $qsp = PublicInbox::Qspawn->new(\@cmd); + my $qsp = PublicInbox::Qspawn->new($cmd); $ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb}; $ctx->{patch_oid} = $oid; $qsp->psgi_yield($ctx->{env}, undef, \&stream_patch_parse_hdr, $ctx); @@ -388,7 +376,7 @@ sub show_commit ($$) { qw(--encoding=UTF-8 -z --no-notes --no-patch), $oid), undef, { 1 => $ctx->{patch_fh} }); $qsp_h->{qsp_err} = \($ctx->{-qsp_err_h} = ''); - my $cmt_fin = PublicInbox::OnDestroy->new($$, \&cmt_fin, $ctx); + my $cmt_fin = on_destroy \&cmt_fin, $ctx; $ctx->{git} = $git; $ctx->{oid} = $oid; $qsp_h->psgi_qx($ctx->{env}, undef, \&cmt_hdr_prep, $ctx, $cmt_fin); @@ -399,8 +387,8 @@ sub show_other ($$) { # just in case... my ($git, $oid, $type, $size) = @$res; $size > $MAX_SIZE and return html_page($ctx, 200, ascii_html($type)." $oid is too big to show\n". dbg_log($ctx)); - my $cmd = ['git', "--git-dir=$git->{git_dir}", - qw(show --encoding=UTF-8 --no-color --no-abbrev), $oid ]; + my $cmd = $git->cmd(qw(show --encoding=UTF-8 + --no-color --no-abbrev), $oid); my $qsp = PublicInbox::Qspawn->new($cmd); $qsp->{qsp_err} = \($ctx->{-qsp_err} = ''); $qsp->psgi_qx($ctx->{env}, undef, \&show_other_result, $ctx); @@ -486,8 +474,7 @@ sub show_tree ($$) { # also used by RepoTree my ($git, $oid, undef, $size) = @$res; $size > $MAX_SIZE and return html_page($ctx, 200, "tree $oid is too big to show\n". dbg_log($ctx)); - my $cmd = [ 'git', "--git-dir=$git->{git_dir}", - qw(ls-tree -z -l --no-abbrev), $oid ]; + my $cmd = $git->cmd(qw(ls-tree -z -l --no-abbrev), $oid); my $qsp = PublicInbox::Qspawn->new($cmd); $ctx->{tree_oid} = $oid; $qsp->{qsp_err} = \($ctx->{-qsp_err} = ''); @@ -624,7 +611,7 @@ sub start_solver ($) { my $v = $ctx->{qp}->{$from} // next; $ctx->{hints}->{$to} = $v if $v ne ''; } - $ctx->{-next_solver} = PublicInbox::OnDestroy->new($$, \&next_solver); + $ctx->{-next_solver} = on_destroy \&next_solver; ++$solver_nr; $ctx->{-tmp} = File::Temp->newdir("solver.$ctx->{oid_b}-XXXX", TMPDIR => 1); diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 1ec574ea..eb90d353 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -445,7 +445,7 @@ sub imap_idle_reap { # awaitpid callback sub imap_idle_fork { my ($self, $uri, $intvl) = @_; return if $self->{quit}; - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { watch_atfork_child($self); watch_imap_idle_1($self, $uri, $intvl); @@ -506,7 +506,7 @@ sub poll_fetch_fork { # DS::add_timer callback my @imap = grep { # push() always returns > 0 $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0) } @$uris; - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { watch_atfork_child($self); watch_imap_fetch_all($self, \@imap) if @imap; diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm index 61aa7862..5e086fee 100644 --- a/lib/PublicInbox/WwwCoderepo.pm +++ b/lib/PublicInbox/WwwCoderepo.pm @@ -24,8 +24,9 @@ use PublicInbox::OnDestroy; use URI::Escape qw(uri_escape_utf8); use File::Spec; use autodie qw(fcntl open); +use PublicInbox::Git qw(git_exe); -my @EACH_REF = (qw(git for-each-ref --sort=-creatordate), +my @EACH_REF = (git_exe, qw(for-each-ref --sort=-creatordate), "--format=%(HEAD)%00".join('%00', map { "%($_)" } qw(objectname refname:short subject creatordate:short))); my $HEADS_CMD = <<''; @@ -87,7 +88,7 @@ sub new { my @s = stat(STDIN) or die "stat(STDIN): $!"; if ("@l[0, 1]" eq "@s[0, 1]") { my $f = fcntl(STDIN, F_GETFL, 0); - $self->{log_fh} = *STDIN{IO} if $f & O_RDWR; + $self->{log_fh} = \*STDIN if $f & O_RDWR; } $self; } @@ -186,7 +187,7 @@ EOM print $zfh "...\n" if $last; # README - my ($bref, $oid, $ref_path) = @{delete $ctx->{qx_res}->{readme}}; + my ($bref, $oid, $ref_path) = @{delete $ctx->{qx_res}->{readme} // []}; if ($bref) { my $l = PublicInbox::Linkify->new; $$bref =~ s/\s*\z//sm; @@ -249,11 +250,11 @@ sub summary ($$) { my $qsp_err = \($ctx->{-qsp_err} = ''); my %opt = (quiet => 1, 2 => $ctx->{wcr}->{log_fh}); my %env = (GIT_DIR => $ctx->{git}->{git_dir}); - my @log = (qw(git log), "-$nl", '--pretty=format:%d %H %h %cs %s'); + my @log = (git_exe, 'log', "-$nl", '--pretty=format:%d %H %h %cs %s'); push(@log, $tip) if defined $tip; # limit scope for MockHTTP test (t/solver_git.t) - my $END = PublicInbox::OnDestroy->new($$, \&summary_END, $ctx); + my $END = on_destroy \&summary_END, $ctx; for (['log', \@log], [ 'heads', [@EACH_REF, "--count=$nb", 'refs/heads'] ], [ 'tags', [@EACH_REF, "--count=$nt", 'refs/tags'] ]) { diff --git a/lib/PublicInbox/WwwText.pm b/lib/PublicInbox/WwwText.pm index 5e23005e..8279591a 100644 --- a/lib/PublicInbox/WwwText.pm +++ b/lib/PublicInbox/WwwText.pm @@ -37,7 +37,7 @@ sub get_text { } my $env = $ctx->{env}; if ($raw) { - my $h = delete $ctx->{-res_hdr}; + my $h = delete $ctx->{-res_hdr} // []; $txt = gzf_maybe($h, $env)->zflush($txt) if $code == 200; push @$h, 'Content-Type', 'text/plain', 'Content-Length', length($txt); diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index 4dcbbe5d..24b3f45e 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -11,7 +11,8 @@ use v5.12; use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC; -use autodie qw(fork pipe socketpair); +use autodie qw(pipe socketpair); +our $tries = 50; sub mkreq { my ($self, $ios, @arg) = @_; @@ -19,13 +20,14 @@ sub mkreq { pipe($r, $ios->[0]) if !defined($ios->[0]); my @fds = map fileno($_), @$ios; my $buf = join("\0", @arg, ''); - $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) // - die "send_cmd: $!"; + $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0, $tries) + // die "send_cmd: $!"; $n == length($buf) or die "send_cmd: $n != ".length($buf); $r; } -sub start_helper { +sub start_helper (@) { + $PublicInbox::IPC::send_cmd or return; # can't work w/o SCM_RIGHTS my @argv = @_; socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0); my $cls = 'PublicInbox::XapHelperCxx'; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index ed11a2f8..ba41b5d2 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -18,7 +18,7 @@ use POSIX qw(:signal_h); use Fcntl qw(LOCK_UN LOCK_EX); use Carp qw(croak); my $X = \%PublicInbox::Search::X; -our (%SRCH, %WORKERS, $nworker, $workerset, $in); +our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX); our $stderr = \*STDERR; sub cmd_test_inspect { @@ -27,6 +27,8 @@ sub cmd_test_inspect { ($req->{srch}->has_threadid ? 1 : 0) } +sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 } + sub iter_retry_check ($) { if (ref($@) =~ /\bDatabaseModifiedError\b/) { $_[0]->{srch}->reopen; @@ -147,17 +149,8 @@ sub cmd_dump_roots { sub mset_iter ($$) { my ($req, $it) = @_; - eval { - my $buf = $it->get_docid; - $buf .= "\0".$it->get_percent if $req->{p}; - my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef; - for my $p (@{$req->{A}}) { - $buf .= "\0".$p.$_ for xap_terms($p, $doc); - } - $buf .= "\0".$doc->get_data if $req->{D}; - say { $req->{0} } $buf; - }; - $@ ? iter_retry_check($req) : 0; + say { $req->{0} } $it->get_docid, "\0", + $it->get_percent, "\0", $it->get_rank; } sub cmd_mset { # to be used by WWW + IMAP @@ -170,7 +163,8 @@ sub cmd_mset { # to be used by WWW + IMAP $opt->{eidx_key} = $req->{O} if defined $req->{O}; $opt->{threadid} = $req->{T} if defined $req->{T}; my $mset = $req->{srch}->mset($qry_str, $opt); - say { $req->{0} } 'mset.size=', $mset->size; + say { $req->{0} } 'mset.size=', $mset->size, + ' .get_matches_estimated=', $mset->get_matches_estimated; for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { $t = mset_iter($req, $it) // $t; @@ -178,36 +172,76 @@ sub cmd_mset { # to be used by WWW + IMAP } } +sub srch_init_extra ($) { + my ($req) = @_; + my $qp = $req->{srch}->{qp}; + for (@{$req->{Q}}) { + my ($upfx, $m, $xpfx) = split /([:=])/; + $xpfx // die "E: bad -Q $_"; + $m = $m eq '=' ? 'add_boolean_prefix' : 'add_prefix'; + $qp->$m($upfx, $xpfx); + } + $req->{srch}->{qp_extra_done} = 1; +} + sub dispatch { my ($req, $cmd, @argv) = @_; my $fn = $req->can("cmd_$cmd") or return; $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC) or return; my $dirs = delete $req->{d} or die 'no -d args'; - my $key = join("\0", @$dirs); - $req->{srch} = $SRCH{$key} //= do { - my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; + my $key = "-d\0".join("\0-d\0", @$dirs); + $key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q}; + my $new; + $req->{srch} = $SRCH{$key} // do { + $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; + my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST; + $SHARD_NFD += $nfd; + if ($SHARD_NFD > $MY_FD_MAX) { + $SHARD_NFD = $nfd; + %SRCH = (); + } my $first = shift @$dirs; - my $slow_phrase = -f "$first/iamchert"; - $new->{xdb} = $X->{Database}->new($first); - for (@$dirs) { - $slow_phrase ||= -f "$_/iamchert"; - $new->{xdb}->add_database($X->{Database}->new($_)); + for my $retried (0, 1) { + my $slow_phrase = -f "$first/iamchert"; + eval { + $new->{xdb} = $X->{Database}->new($first); + for (@$dirs) { + $slow_phrase ||= -f "$_/iamchert"; + $new->{xdb}->add_database( + $X->{Database}->new($_)) + } + }; + last unless $@; + if ($retried) { + die "E: $@\n"; + } else { # may be EMFILE/ENFILE/ENOMEM.... + warn "W: $@, retrying...\n"; + %SRCH = (); + $SHARD_NFD = $nfd; + } + $slow_phrase or $new->{qp_flags} + |= PublicInbox::Search::FLAG_PHRASE(); } - $slow_phrase or - $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE(); bless $new, $req->{c} ? 'PublicInbox::CodeSearch' : 'PublicInbox::Search'; $new->{qp} = $new->qparse_new; - $new; + $SRCH{$key} = $new; }; + $req->{srch}->{xdb}->reopen unless $new; + $req->{Q} && !$req->{srch}->{qp_extra_done} and + srch_init_extra $req; + my $timeo = $req->{K}; + alarm($timeo) if $timeo; $fn->($req, @argv); + alarm(0) if $timeo; } sub recv_loop { local $SIG{__WARN__} = sub { print $stderr @_ }; my $rbuf; local $SIG{TERM} = sub { undef $in }; + local $SIG{USR1} = \&reopen_logs; while (defined($in)) { PublicInbox::DS::sig_setmask($workerset); my @fds = eval { # we undef $in in SIG{TERM} @@ -219,7 +253,7 @@ sub recv_loop { } scalar(@fds) or exit(66); # EX_NOINPUT die "recvmsg: $!" if !defined($fds[0]); - PublicInbox::DS::block_signals(); + PublicInbox::DS::block_signals(POSIX::SIGALRM); my $req = bless {}, __PACKAGE__; my $i = 0; open($req->{$i++}, '+<&=', $_) for @fds; @@ -244,7 +278,7 @@ sub reap_worker { # awaitpid CB sub start_worker ($) { my ($nr) = @_; - my $pid = eval { PublicInbox::DS::do_fork } // return(warn($@)); + my $pid = eval { PublicInbox::DS::fork_persist } // return(warn($@)); if ($pid == 0) { undef %WORKERS; $SIG{TTIN} = $SIG{TTOU} = 'IGNORE'; @@ -271,6 +305,18 @@ sub do_sigttou { } } +sub reopen_logs { + my $p = $ENV{STDOUT_PATH}; + defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1); + $p = $ENV{STDERR_PATH}; + defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1); +} + +sub parent_reopen_logs { + reopen_logs(); + kill('USR1', values %WORKERS); +} + sub xh_alive { $in || scalar(keys %WORKERS) } sub start (@) { @@ -278,15 +324,19 @@ sub start (@) { my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE); unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET'; - local (%SRCH, %WORKERS); + local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX); PublicInbox::Search::load_xapian(); $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or die 'bad args'; local $workerset = POSIX::SigSet->new; $workerset->fillset or die "fillset: $!"; - for (@PublicInbox::DS::UNBLOCKABLE) { + for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) { $workerset->delset($_) or die "delset($_): $!"; } + $MY_FD_MAX = PublicInbox::Search::ulimit_n // + die "E: unable to get RLIMIT_NOFILE: $!"; + warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72; + $MY_FD_MAX -= 64; local $nworker = $opt->{j}; return recv_loop() if $nworker == 0; @@ -303,6 +353,7 @@ sub start (@) { }, TTOU => \&do_sigttou, CHLD => \&PublicInbox::DS::enqueue_reap, + USR1 => \&parent_reopen_logs, }; PublicInbox::DS::block_signals(); start_workers(); diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm index eafe61a8..922bd583 100644 --- a/lib/PublicInbox/XapHelperCxx.pm +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -16,8 +16,15 @@ use autodie; my $cxx = which($ENV{CXX} // 'c++') // which('clang') // die 'no C++ compiler'; my $dir = substr("$cxx-$Config{archname}", 1); # drop leading '/' $dir =~ tr!/!-!; -my $idir = ($ENV{XDG_CACHE_HOME} // - (($ENV{HOME} // die('HOME unset')).'/.cache')).'/public-inbox/jaot'; +my $idir; +if ((defined($ENV{XDG_CACHE_HOME}) && -d $ENV{XDG_CACHE_HOME}) || + (defined($ENV{HOME}) && -d $ENV{HOME})) { + $idir = ($ENV{XDG_CACHE_HOME} // + (($ENV{HOME} // die('HOME unset')).'/.cache') + ).'/public-inbox/jaot'; +} +$idir //= $ENV{PERL_INLINE_DIRECTORY} // + die 'HOME and PERL_INLINE_DIRECTORY unset'; substr($dir, 0, 0) = "$idir/"; my $bin = "$dir/xap_helper"; my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!); @@ -27,6 +34,7 @@ my $ldflags = '-Wl,-O1'; $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd'; my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' . ' -DTHREADID=' . PublicInbox::Search::THREADID . + ' -DSHARD_COST=' . PublicInbox::Search::SHARD_COST . ' -DXH_SPEC="'.join('', map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' . ($ENV{LDFLAGS} // $ldflags); @@ -58,7 +66,11 @@ sub needs_rebuild () { sub build () { if (!-d $dir) { require File::Path; - File::Path::make_path($dir); + eval { File::Path::make_path($dir) }; + if (!-d $dir && defined($ENV{PERL_INLINE_DIRECTORY})) { + $dir = $ENV{PERL_INLINE_DIRECTORY}; + File::Path::make_path($dir); + } } require PublicInbox::CodeSearch; require PublicInbox::Lock; diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 69f0af43..9a148ae4 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -103,7 +103,7 @@ sub commit_changes ($$$$) { sub cb_spawn { my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact() - my $pid = PublicInbox::DS::do_fork; + my $pid = PublicInbox::DS::fork_persist; return $pid if $pid > 0; $SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack $cb->($args, $opt); diff --git a/lib/PublicInbox/XhcMset.pm b/lib/PublicInbox/XhcMset.pm new file mode 100644 index 00000000..ac25eece --- /dev/null +++ b/lib/PublicInbox/XhcMset.pm @@ -0,0 +1,51 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# mocks Xapian::Mset and allows slow queries from blocking the event loop +package PublicInbox::XhcMset; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::XhcMsetIterator; +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); + +sub event_step { + my ($self) = @_; + my ($cb, @args) = @{delete $self->{cb_args} // return}; + my $rd = $self->{sock}; + eval { + my $hdr = <$rd> // die "E: reading mset header: $!"; + for (split /\s+/, $hdr) { # read mset.size + estimated_matches + my ($k, $v) = split /=/, $_, 2; + $k =~ s/\A[^\.]*\.//; # s/(mset)?\./ + $self->{$k} = $v; + } + my $size = $self->{size} // die "E: bad xhc header: `$hdr'"; + my @it = map { PublicInbox::XhcMsetIterator::make($_) } <$rd>; + $self->{items} = \@it; + scalar(@it) == $size or die + 'E: got ',scalar(@it),", expected mset.size=$size"; + }; + my $err = $@; + $self->close; + eval { $cb->(@args, $self, $err) }; + warn "E: $@\n" if $@; +} + +sub maybe_new { + my (undef, $rd, $srch, @cb_args) = @_; + my $self = bless { cb_args => \@cb_args, srch => $srch }, __PACKAGE__; + if ($PublicInbox::DS::in_loop) { # async + $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); + } else { # synchronous + $self->{sock} = $rd; + event_step($self); + undef; + } +} + +eval(join('', map { "sub $_ { \$_[0]->{$_} }\n" } qw(size + get_matches_estimated))); + +sub items { @{$_[0]->{items}} } + +1; diff --git a/lib/PublicInbox/XhcMsetIterator.pm b/lib/PublicInbox/XhcMsetIterator.pm new file mode 100644 index 00000000..dcfc61e4 --- /dev/null +++ b/lib/PublicInbox/XhcMsetIterator.pm @@ -0,0 +1,20 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# mocks Xapian::MsetIterator, there's many of these allocated at once +package PublicInbox::XhcMsetIterator; +use v5.12; + +sub make ($) { + chomp($_[0]); + my @self = map { $_ + 0 } split /\0/, $_[0]; # docid, pct, rank + # we don't store $xdb in self[4] since we avoid $it->get_document + # in favor of $xdb->get_document($it->get_docid) + bless \@self, __PACKAGE__; +} + +sub get_docid { $_[0]->[0] } +sub get_percent { $_[0]->[1] } +sub get_rank { $_[0]->[2] } + +1; diff --git a/lib/PublicInbox/khashl.h b/lib/PublicInbox/khashl.h new file mode 100644 index 00000000..170b81ff --- /dev/null +++ b/lib/PublicInbox/khashl.h @@ -0,0 +1,502 @@ +/* The MIT License + + Copyright (c) 2019-2023 by Attractive Chaos <attractor@live.co.uk> + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +#ifndef __AC_KHASHL_H +#define __AC_KHASHL_H + +#define AC_VERSION_KHASHL_H "0.2" + +typedef uint32_t khint32_t; +typedef uint64_t khint64_t; + +typedef khint32_t khint_t; +typedef khint_t khiter_t; + +#define kh_inline inline +#define KH_LOCAL static kh_inline + +#ifndef kcalloc +#define kcalloc(N,Z) xcalloc(N,Z) +#endif +#ifndef kfree +#define kfree(P) free(P) +#endif + +/**************************** + * Simple private functions * + ****************************/ + +#define __kh_used(flag, i) (flag[i>>5] >> (i&0x1fU) & 1U) +#define __kh_set_used(flag, i) (flag[i>>5] |= 1U<<(i&0x1fU)) +#define __kh_set_unused(flag, i) (flag[i>>5] &= ~(1U<<(i&0x1fU))) + +#define __kh_fsize(m) ((m) < 32? 1 : (m)>>5) + +static kh_inline khint_t __kh_h2b(khint_t hash, khint_t bits) { return hash * 2654435769U >> (32 - bits); } + +/******************* + * Hash table base * + *******************/ + +#define __KHASHL_TYPE(HType, khkey_t) \ + typedef struct HType { \ + khint_t bits, count; \ + khint32_t *used; \ + khkey_t *keys; \ + } HType; + +#define __KHASHL_PROTOTYPES(HType, prefix, khkey_t) \ + extern HType *prefix##_init(void); \ + extern void prefix##_destroy(HType *h); \ + extern void prefix##_clear(HType *h); \ + extern khint_t prefix##_getp(const HType *h, const khkey_t *key); \ + extern void prefix##_resize(HType *h, khint_t new_n_buckets); \ + extern khint_t prefix##_putp(HType *h, const khkey_t *key, int *absent); \ + extern void prefix##_del(HType *h, khint_t k); + +#define __KHASHL_IMPL_BASIC(SCOPE, HType, prefix) \ + SCOPE HType *prefix##_init(void) { \ + return (HType*)kcalloc(1, sizeof(HType)); \ + } \ + SCOPE void prefix##_release(HType *h) { \ + kfree((void *)h->keys); kfree(h->used); \ + } \ + SCOPE void prefix##_destroy(HType *h) { \ + if (!h) return; \ + prefix##_release(h); \ + kfree(h); \ + } \ + SCOPE void prefix##_clear(HType *h) { \ + if (h && h->used) { \ + khint_t n_buckets = (khint_t)1U << h->bits; \ + memset(h->used, 0, __kh_fsize(n_buckets) * sizeof(khint32_t)); \ + h->count = 0; \ + } \ + } + +#define __KHASHL_IMPL_GET(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + SCOPE khint_t prefix##_getp_core(const HType *h, const khkey_t *key, khint_t hash) { \ + khint_t i, last, n_buckets, mask; \ + if (!h->keys) return 0; \ + n_buckets = (khint_t)1U << h->bits; \ + mask = n_buckets - 1U; \ + i = last = __kh_h2b(hash, h->bits); \ + while (__kh_used(h->used, i) && !__hash_eq(h->keys[i], *key)) { \ + i = (i + 1U) & mask; \ + if (i == last) return n_buckets; \ + } \ + return !__kh_used(h->used, i)? n_buckets : i; \ + } \ + SCOPE khint_t prefix##_getp(const HType *h, const khkey_t *key) { return prefix##_getp_core(h, key, __hash_fn(*key)); } \ + SCOPE khint_t prefix##_get(const HType *h, khkey_t key) { return prefix##_getp_core(h, &key, __hash_fn(key)); } + +#define __KHASHL_IMPL_RESIZE(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + SCOPE void prefix##_resize(HType *h, khint_t new_n_buckets) { \ + khint32_t *new_used = NULL; \ + khint_t j = 0, x = new_n_buckets, n_buckets, new_bits, new_mask; \ + while ((x >>= 1) != 0) ++j; \ + if (new_n_buckets & (new_n_buckets - 1)) ++j; \ + new_bits = j > 2? j : 2; \ + new_n_buckets = (khint_t)1U << new_bits; \ + if (h->count > (new_n_buckets>>1) + (new_n_buckets>>2)) return; /* noop, requested size is too small */ \ + new_used = (khint32_t*)kcalloc(__kh_fsize(new_n_buckets), sizeof(khint32_t)); \ + n_buckets = h->keys? (khint_t)1U<<h->bits : 0U; \ + if (n_buckets < new_n_buckets) { /* expand */ \ + h->keys = (khkey_t *)xreallocarray(h->keys, \ + new_n_buckets, sizeof(khkey_t)); \ + } /* otherwise shrink */ \ + new_mask = new_n_buckets - 1; \ + for (j = 0; j != n_buckets; ++j) { \ + khkey_t key; \ + if (!__kh_used(h->used, j)) continue; \ + key = h->keys[j]; \ + __kh_set_unused(h->used, j); \ + while (1) { /* kick-out process; sort of like in Cuckoo hashing */ \ + khint_t i; \ + i = __kh_h2b(__hash_fn(key), new_bits); \ + while (__kh_used(new_used, i)) i = (i + 1) & new_mask; \ + __kh_set_used(new_used, i); \ + if (i < n_buckets && __kh_used(h->used, i)) { /* kick out the existing element */ \ + { khkey_t tmp = h->keys[i]; h->keys[i] = key; key = tmp; } \ + __kh_set_unused(h->used, i); /* mark it as deleted in the old hash table */ \ + } else { /* write the element and jump out of the loop */ \ + h->keys[i] = key; \ + break; \ + } \ + } \ + } \ + if (n_buckets > new_n_buckets) /* shrink the hash table */ \ + h->keys = (khkey_t *)xreallocarray(h->keys, \ + new_n_buckets, sizeof(khkey_t)); \ + kfree(h->used); /* free the working space */ \ + h->used = new_used, h->bits = new_bits; \ + } + +#define __KHASHL_IMPL_PUT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + SCOPE khint_t prefix##_putp_core(HType *h, const khkey_t *key, khint_t hash, int *absent) { \ + khint_t n_buckets, i, last, mask; \ + n_buckets = h->keys? (khint_t)1U<<h->bits : 0U; \ + *absent = -1; \ + if (h->count >= (n_buckets>>1) + (n_buckets>>2)) { /* rehashing */ \ + prefix##_resize(h, n_buckets + 1U); \ + n_buckets = (khint_t)1U<<h->bits; \ + } /* TODO: to implement automatically shrinking; resize() already support shrinking */ \ + mask = n_buckets - 1; \ + i = last = __kh_h2b(hash, h->bits); \ + while (__kh_used(h->used, i) && !__hash_eq(h->keys[i], *key)) { \ + i = (i + 1U) & mask; \ + if (i == last) break; \ + } \ + if (!__kh_used(h->used, i)) { /* not present at all */ \ + h->keys[i] = *key; \ + __kh_set_used(h->used, i); \ + ++h->count; \ + *absent = 1; \ + } else *absent = 0; /* Don't touch h->keys[i] if present */ \ + return i; \ + } \ + SCOPE khint_t prefix##_putp(HType *h, const khkey_t *key, int *absent) { return prefix##_putp_core(h, key, __hash_fn(*key), absent); } \ + SCOPE khint_t prefix##_put(HType *h, khkey_t key, int *absent) { return prefix##_putp_core(h, &key, __hash_fn(key), absent); } + +#define __KHASHL_IMPL_DEL(SCOPE, HType, prefix, khkey_t, __hash_fn) \ + SCOPE int prefix##_del(HType *h, khint_t i) { \ + khint_t j = i, k, mask, n_buckets; \ + if (!h->keys) return 0; \ + n_buckets = (khint_t)1U<<h->bits; \ + mask = n_buckets - 1U; \ + while (1) { \ + j = (j + 1U) & mask; \ + if (j == i || !__kh_used(h->used, j)) break; /* j==i only when the table is completely full */ \ + k = __kh_h2b(__hash_fn(h->keys[j]), h->bits); \ + if ((j > i && (k <= i || k > j)) || (j < i && (k <= i && k > j))) \ + h->keys[i] = h->keys[j], i = j; \ + } \ + __kh_set_unused(h->used, i); \ + --h->count; \ + return 1; \ + } + +#define KHASHL_DECLARE(HType, prefix, khkey_t) \ + __KHASHL_TYPE(HType, khkey_t) \ + __KHASHL_PROTOTYPES(HType, prefix, khkey_t) + +#define KHASHL_INIT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + __KHASHL_TYPE(HType, khkey_t) \ + __KHASHL_IMPL_BASIC(SCOPE, HType, prefix) \ + __KHASHL_IMPL_GET(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + __KHASHL_IMPL_RESIZE(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + __KHASHL_IMPL_PUT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + __KHASHL_IMPL_DEL(SCOPE, HType, prefix, khkey_t, __hash_fn) + +#define KHASHE_INIT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + KHASHL_INIT(KH_LOCAL, HType##_sub, prefix##_sub, khkey_t, __hash_fn, __hash_eq) \ + typedef struct HType { \ + khint64_t count:54, bits:8; \ + HType##_sub *sub; \ + } HType; \ + SCOPE HType *prefix##_init_sub(HType *g, size_t bits) { \ + g->bits = bits; \ + g->sub = (HType##_sub*)kcalloc(1U<<bits, sizeof(*g->sub)); \ + return g; \ + } \ + SCOPE HType *prefix##_init(void) { \ + HType *g; \ + g = (HType*)kcalloc(1, sizeof(*g)); \ + return prefix##_init_sub(g, 0); /* unsure about default */ \ + } \ + SCOPE void prefix##_release(HType *g) { \ + int t; \ + for (t = 0; t < 1<<g->bits; ++t) \ + prefix##_sub_release(&g->sub[t]); \ + kfree(g->sub); \ + } \ + SCOPE void prefix##_destroy(HType *g) { \ + if (!g) return; \ + prefix##_release(g); \ + kfree(g); \ + } \ + SCOPE void prefix##_clear(HType *g) { \ + int t; \ + if (!g) return; \ + for (t = 0; t < 1<<g->bits; ++t) \ + prefix##_sub_clear(&g->sub[t]); \ + } \ + SCOPE kh_ensitr_t prefix##_getp(const HType *g, const khkey_t *key) { \ + khint_t hash, low, ret; \ + kh_ensitr_t r; \ + HType##_sub *h; \ + hash = __hash_fn(*key); \ + low = hash & ((1U<<g->bits) - 1); \ + h = &g->sub[low]; \ + ret = prefix##_sub_getp_core(h, key, hash); \ + if (ret >= kh_end(h)) r.sub = low, r.pos = (khint_t)-1; \ + else r.sub = low, r.pos = ret; \ + return r; \ + } \ + SCOPE kh_ensitr_t prefix##_get(const HType *g, const khkey_t key) { return prefix##_getp(g, &key); } \ + SCOPE kh_ensitr_t prefix##_putp(HType *g, const khkey_t *key, int *absent) { \ + khint_t hash, low, ret; \ + kh_ensitr_t r; \ + HType##_sub *h; \ + hash = __hash_fn(*key); \ + low = hash & ((1U<<g->bits) - 1); \ + h = &g->sub[low]; \ + ret = prefix##_sub_putp_core(h, key, hash, absent); \ + if (*absent) ++g->count; \ + if (ret == 1U<<h->bits) r.sub = low, r.pos = (khint_t)-1; \ + else r.sub = low, r.pos = ret; \ + return r; \ + } \ + SCOPE kh_ensitr_t prefix##_put(HType *g, const khkey_t key, int *absent) { return prefix##_putp(g, &key, absent); } \ + SCOPE int prefix##_del(HType *g, kh_ensitr_t itr) { \ + HType##_sub *h = &g->sub[itr.sub]; \ + int ret; \ + ret = prefix##_sub_del(h, itr.pos); \ + if (ret) --g->count; \ + return ret; \ + } + +/***************************** + * More convenient interface * + *****************************/ + +#define __kh_packed /* noop, we use -Werror=address-of-packed-member */ +#define __kh_cached_hash(x) ((x).hash) + +#define KHASHL_SET_INIT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + typedef struct { khkey_t key; } __kh_packed HType##_s_bucket_t; \ + static kh_inline khint_t prefix##_s_hash(HType##_s_bucket_t x) { return __hash_fn(x.key); } \ + static kh_inline int prefix##_s_eq(HType##_s_bucket_t x, HType##_s_bucket_t y) { return __hash_eq(x.key, y.key); } \ + KHASHL_INIT(KH_LOCAL, HType, prefix##_s, HType##_s_bucket_t, prefix##_s_hash, prefix##_s_eq) \ + SCOPE HType *prefix##_init(void) { return prefix##_s_init(); } \ + SCOPE void prefix##_release(HType *h) { prefix##_s_release(h); } \ + SCOPE void prefix##_destroy(HType *h) { prefix##_s_destroy(h); } \ + SCOPE void prefix##_clear(HType *h) { prefix##_s_clear(h); } \ + SCOPE void prefix##_resize(HType *h, khint_t new_n_buckets) { prefix##_s_resize(h, new_n_buckets); } \ + SCOPE khint_t prefix##_get(const HType *h, khkey_t key) { HType##_s_bucket_t t; t.key = key; return prefix##_s_getp(h, &t); } \ + SCOPE int prefix##_del(HType *h, khint_t k) { return prefix##_s_del(h, k); } \ + SCOPE khint_t prefix##_put(HType *h, khkey_t key, int *absent) { HType##_s_bucket_t t; t.key = key; return prefix##_s_putp(h, &t, absent); } \ + +#define KHASHL_MAP_INIT(SCOPE, HType, prefix, khkey_t, kh_val_t, __hash_fn, __hash_eq) \ + typedef struct { khkey_t key; kh_val_t val; } __kh_packed HType##_m_bucket_t; \ + static kh_inline khint_t prefix##_m_hash(HType##_m_bucket_t x) { return __hash_fn(x.key); } \ + static kh_inline int prefix##_m_eq(HType##_m_bucket_t x, HType##_m_bucket_t y) { return __hash_eq(x.key, y.key); } \ + KHASHL_INIT(KH_LOCAL, HType, prefix##_m, HType##_m_bucket_t, prefix##_m_hash, prefix##_m_eq) \ + SCOPE HType *prefix##_init(void) { return prefix##_m_init(); } \ + SCOPE void prefix##_release(HType *h) { prefix##_m_release(h); } \ + SCOPE void prefix##_destroy(HType *h) { prefix##_m_destroy(h); } \ + SCOPE void prefix##_clear(HType *h) { prefix##_m_clear(h); } \ + SCOPE void prefix##_resize(HType *h, khint_t new_n_buckets) { prefix##_m_resize(h, new_n_buckets); } \ + SCOPE khint_t prefix##_get(const HType *h, khkey_t key) { HType##_m_bucket_t t; t.key = key; return prefix##_m_getp(h, &t); } \ + SCOPE int prefix##_del(HType *h, khint_t k) { return prefix##_m_del(h, k); } \ + SCOPE khint_t prefix##_put(HType *h, khkey_t key, int *absent) { HType##_m_bucket_t t; t.key = key; return prefix##_m_putp(h, &t, absent); } \ + +#define KHASHL_CSET_INIT(SCOPE, HType, prefix, khkey_t, __hash_fn, __hash_eq) \ + typedef struct { khkey_t key; khint_t hash; } __kh_packed HType##_cs_bucket_t; \ + static kh_inline int prefix##_cs_eq(HType##_cs_bucket_t x, HType##_cs_bucket_t y) { return x.hash == y.hash && __hash_eq(x.key, y.key); } \ + KHASHL_INIT(KH_LOCAL, HType, prefix##_cs, HType##_cs_bucket_t, __kh_cached_hash, prefix##_cs_eq) \ + SCOPE HType *prefix##_init(void) { return prefix##_cs_init(); } \ + SCOPE void prefix##_destroy(HType *h) { prefix##_cs_destroy(h); } \ + SCOPE khint_t prefix##_get(const HType *h, khkey_t key) { HType##_cs_bucket_t t; t.key = key; t.hash = __hash_fn(key); return prefix##_cs_getp(h, &t); } \ + SCOPE int prefix##_del(HType *h, khint_t k) { return prefix##_cs_del(h, k); } \ + SCOPE khint_t prefix##_put(HType *h, khkey_t key, int *absent) { HType##_cs_bucket_t t; t.key = key, t.hash = __hash_fn(key); return prefix##_cs_putp(h, &t, absent); } + +#define KHASHL_CMAP_INIT(SCOPE, HType, prefix, khkey_t, kh_val_t, __hash_fn, __hash_eq) \ + typedef struct { khkey_t key; kh_val_t val; khint_t hash; } __kh_packed HType##_cm_bucket_t; \ + static kh_inline int prefix##_cm_eq(HType##_cm_bucket_t x, HType##_cm_bucket_t y) { return x.hash == y.hash && __hash_eq(x.key, y.key); } \ + KHASHL_INIT(KH_LOCAL, HType, prefix##_cm, HType##_cm_bucket_t, __kh_cached_hash, prefix##_cm_eq) \ + SCOPE HType *prefix##_init(void) { return prefix##_cm_init(); } \ + SCOPE void prefix##_destroy(HType *h) { prefix##_cm_destroy(h); } \ + SCOPE khint_t prefix##_get(const HType *h, khkey_t key) { HType##_cm_bucket_t t; t.key = key; t.hash = __hash_fn(key); return prefix##_cm_getp(h, &t); } \ + SCOPE int prefix##_del(HType *h, khint_t k) { return prefix##_cm_del(h, k); } \ + SCOPE khint_t prefix##_put(HType *h, khkey_t key, int *absent) { HType##_cm_bucket_t t; t.key = key, t.hash = __hash_fn(key); return prefix##_cm_putp(h, &t, absent); } + +#define KHASHE_MAP_INIT(SCOPE, HType, prefix, khkey_t, kh_val_t, __hash_fn, __hash_eq) \ + typedef struct { khkey_t key; kh_val_t val; } __kh_packed HType##_m_bucket_t; \ + static kh_inline khint_t prefix##_m_hash(HType##_m_bucket_t x) { return __hash_fn(x.key); } \ + static kh_inline int prefix##_m_eq(HType##_m_bucket_t x, HType##_m_bucket_t y) { return __hash_eq(x.key, y.key); } \ + KHASHE_INIT(KH_LOCAL, HType, prefix##_m, HType##_m_bucket_t, prefix##_m_hash, prefix##_m_eq) \ + SCOPE HType *prefix##_init(void) { return prefix##_m_init(); } \ + SCOPE void prefix##_release(HType *h) { prefix##_m_release(h); } \ + SCOPE void prefix##_destroy(HType *h) { prefix##_m_destroy(h); } \ + SCOPE void prefix##_clear(HType *h) { prefix##_m_clear(h); } \ + SCOPE void prefix##_resize(HType *h, khint_t ignore) { /* noop */ } \ + SCOPE kh_ensitr_t prefix##_get(const HType *h, khkey_t key) { HType##_m_bucket_t t; t.key = key; return prefix##_m_getp(h, &t); } \ + SCOPE int prefix##_del(HType *h, kh_ensitr_t k) { return prefix##_m_del(h, k); } \ + SCOPE kh_ensitr_t prefix##_put(HType *h, khkey_t key, int *absent) { HType##_m_bucket_t t; t.key = key; return prefix##_m_putp(h, &t, absent); } \ + +/************************** + * Public macro functions * + **************************/ + +#define kh_bucket(h, x) ((h)->keys[x]) + +/*! @function + @abstract Get the number of elements in the hash table + @param h Pointer to the hash table + @return Number of elements in the hash table [khint_t] + */ +#define kh_size(h) ((h)->count) + +#define kh_capacity(h) ((h)->keys? 1U<<(h)->bits : 0U) + +/*! @function + @abstract Get the end iterator + @param h Pointer to the hash table + @return The end iterator [khint_t] + */ +#define kh_end(h) kh_capacity(h) + +/*! @function + @abstract Get key given an iterator + @param h Pointer to the hash table + @param x Iterator to the bucket [khint_t] + @return Key [type of keys] + */ +#define kh_key(h, x) ((h)->keys[x].key) + +/*! @function + @abstract Get value given an iterator + @param h Pointer to the hash table + @param x Iterator to the bucket [khint_t] + @return Value [type of values] + @discussion For hash sets, calling this results in segfault. + */ +#define kh_val(h, x) ((h)->keys[x].val) + +/*! @function + @abstract Alias of kh_val() + */ +#define kh_value(h, x) kh_val(h, x) + +/*! @function + @abstract Test whether a bucket contains data. + @param h Pointer to the hash table + @param x Iterator to the bucket [khint_t] + @return 1 if containing data; 0 otherwise [int] + */ +#define kh_exist(h, x) __kh_used((h)->used, (x)) + +#define kh_ens_key(g, x) kh_key(&(g)->sub[(x).sub], (x).pos) +#define kh_ens_val(g, x) kh_val(&(g)->sub[(x).sub], (x).pos) +#define kh_ens_exist(g, x) kh_exist(&(g)->sub[(x).sub], (x).pos) +#define kh_ens_is_end(x) ((x).pos == (khint_t)-1) +#define kh_ens_size(g) ((g)->count) + +/************************************** + * Common hash and equality functions * + **************************************/ + +#define kh_eq_generic(a, b) ((a) == (b)) +#define kh_eq_str(a, b) (strcmp((a), (b)) == 0) +#define kh_hash_dummy(x) ((khint_t)(x)) + +static kh_inline khint_t kh_hash_uint32(khint_t key) { + key += ~(key << 15); + key ^= (key >> 10); + key += (key << 3); + key ^= (key >> 6); + key += ~(key << 11); + key ^= (key >> 16); + return key; +} + +static kh_inline khint_t kh_hash_uint64(khint64_t key) { + key = ~key + (key << 21); + key = key ^ key >> 24; + key = (key + (key << 3)) + (key << 8); + key = key ^ key >> 14; + key = (key + (key << 2)) + (key << 4); + key = key ^ key >> 28; + key = key + (key << 31); + return (khint_t)key; +} + +#define KH_FNV_SEED 11 + +static kh_inline khint_t kh_hash_str(const char *s) { /* FNV1a */ + khint_t h = KH_FNV_SEED ^ 2166136261U; + const unsigned char *t = (const unsigned char*)s; + for (; *t; ++t) + h ^= *t, h *= 16777619; + return h; +} + +static kh_inline khint_t kh_hash_bytes(int len, const unsigned char *s) { + khint_t h = KH_FNV_SEED ^ 2166136261U; + int i; + for (i = 0; i < len; ++i) + h ^= s[i], h *= 16777619; + return h; +} + +/*! @function + @abstract Get the start iterator + @param h Pointer to the hash table + @return The start iterator [khint_t] + */ +#define kh_begin(h) (khint_t)(0) + +/*! @function + @abstract Iterate over the entries in the hash table + @param h Pointer to the hash table + @param kvar Variable to which key will be assigned + @param vvar Variable to which value will be assigned + @param code Block of code to execute + */ +#define kh_foreach(h, kvar, vvar, code) { khint_t __i; \ + for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \ + if (!kh_exist(h,__i)) continue; \ + (kvar) = kh_key(h,__i); \ + (vvar) = kh_val(h,__i); \ + code; \ + } } + +#define kh_ens_foreach(g, kvar, vvar, code) do { \ + size_t t; \ + for (t = 0; t < 1<<g->bits; ++t) \ + kh_foreach(&g->sub[t], kvar, vvar, code); \ +} while (0) + +#define kh_ens_foreach_value(g, vvar, code) do { \ + size_t t; \ + for (t = 0; t < 1<<g->bits; ++t) \ + kh_foreach_value(&g->sub[t], vvar, code); \ +} while (0) + +/*! @function + @abstract Iterate over the values in the hash table + @param h Pointer to the hash table + @param vvar Variable to which value will be assigned + @param code Block of code to execute + */ +#define kh_foreach_value(h, vvar, code) { khint_t __i; \ + for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \ + if (!kh_exist(h,__i)) continue; \ + (vvar) = kh_val(h,__i); \ + code; \ + } } + +#endif /* __AC_KHASHL_H */ diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 3456910b..51ab48bf 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -7,7 +7,7 @@ * this is not linked to Perl in any way. * C (not C++) is used as much as possible to lower the contribution * barrier for hackers who mainly know C (this includes the maintainer). - * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * Yes, that means we use C stdlib stuff like open_memstream * instead their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers @@ -15,6 +15,9 @@ #ifndef _ALL_SOURCE # define _ALL_SOURCE #endif +#ifndef _GNU_SOURCE +# define _GNU_SOURCE +#endif #if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3) # define _OPENBSD_SOURCE #endif @@ -27,13 +30,13 @@ #include <sys/types.h> #include <sys/uio.h> #include <sys/wait.h> +#include <poll.h> #include <assert.h> #include <err.h> // BSD, glibc, and musl all have this #include <errno.h> #include <fcntl.h> #include <limits.h> -#include <search.h> #include <signal.h> #include <stddef.h> #include <stdint.h> @@ -82,6 +85,62 @@ #define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0) #define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0) +static void *xcalloc(size_t nmemb, size_t size) +{ + void *ret = calloc(nmemb, size); + if (!ret) EABORT("calloc(%zu, %zu)", nmemb, size); + return ret; +} + +#if defined(__GLIBC__) && defined(__GLIBC_MINOR__) && \ + MY_VER(__GLIBC__, __GLIBC_MINOR__, 0) >= MY_VER(2, 28, 0) +# define HAVE_REALLOCARRAY 1 +#elif defined(__OpenBSD__) || defined(__DragonFly__) || \ + defined(__FreeBSD__) || defined(__NetBSD__) +# define HAVE_REALLOCARRAY 1 +#endif + +static void *xreallocarray(void *ptr, size_t nmemb, size_t size) +{ +#ifdef HAVE_REALLOCARRAY + void *ret = reallocarray(ptr, nmemb, size); +#else // can't rely on __builtin_mul_overflow in gcc 4.x :< + void *ret = NULL; + if (nmemb && size > SIZE_MAX / nmemb) + errno = ENOMEM; + else + ret = realloc(ptr, nmemb * size); +#endif + if (!ret) EABORT("reallocarray(..., %zu, %zu)", nmemb, size); + return ret; +} + +#include "khashl.h" + +struct srch { + int ckey_len; // int for comparisons + unsigned qp_flags; + bool qp_extra_done; + Xapian::Database *db; + Xapian::QueryParser *qp; + unsigned char ckey[]; // $shard_path0\0$shard_path1\0... +}; + +static khint_t srch_hash(const struct srch *srch) +{ + return kh_hash_bytes(srch->ckey_len, srch->ckey); +} + +static int srch_eq(const struct srch *a, const struct srch *b) +{ + return a->ckey_len == b->ckey_len ? + !memcmp(a->ckey, b->ckey, (size_t)a->ckey_len) : 0; +} + +KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *, + srch_hash, srch_eq) +static srch_set *srch_cache; +static long my_fd_max, shard_nfd; // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET static volatile int sock_fd = STDIN_FILENO; static sigset_t fullset, workerset; @@ -90,11 +149,12 @@ static bool alive = true; static FILE *orig_err = stderr; #endif static int orig_err_fd = -1; -static void *srch_tree; // tsearch + tdelete + twalk static pid_t *worker_pids; // nr => pid #define WORKER_MAX USHRT_MAX static unsigned long nworker, nworker_hwm; static int pipefds[2]; +static const char *stdout_path, *stderr_path; // for SIGUSR1 +static sig_atomic_t worker_needs_reopen; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -108,14 +168,6 @@ enum exc_iter { ITER_ABORT }; -struct srch { - int paths_len; // int for comparisons - unsigned qp_flags; - Xapian::Database *db; - Xapian::QueryParser *qp; - char paths[]; // $shard_path0\0$shard_path1\0... -}; - #define MY_ARG_MAX 256 typedef bool (*cmd)(struct req *); @@ -123,6 +175,8 @@ typedef bool (*cmd)(struct req *); struct req { // argv and pfxv point into global rbuf char *argv[MY_ARG_MAX]; char *pfxv[MY_ARG_MAX]; // -A <prefix> + char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX> + char *dirv[MY_ARG_MAX]; // -d /path/to/XDB(shard) size_t *lenv; // -A <prefix>LENGTH struct srch *srch; char *Pgit_dir; @@ -134,15 +188,12 @@ struct req { // argv and pfxv point into global rbuf unsigned long timeout_sec; size_t nr_out; long sort_col; // value column, negative means BoolWeight - int argc; - int pfxc; + int argc, pfxc, qpfxc, dirc; FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) bool has_input; // fp[0] is bidirectional bool collapse_threads; bool code_search; bool relevance; // sort by relevance before column - bool emit_percent; - bool emit_docdata; bool asc; // ascending sort }; @@ -226,6 +277,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } + // TODO: uid_range + if (req->threadid != ULLONG_MAX) { + std::string tid = Xapian::sortable_serialise(req->threadid); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID, + tid, tid)); + } Xapian::Enquire enq = prep_enquire(req); enq.set_query(qry); // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm @@ -364,25 +422,6 @@ static size_t off2size(off_t n) return (size_t)n; } -static char *hsearch_enter_key(char *s) -{ -#if defined(__OpenBSD__) || defined(__DragonFly__) - // hdestroy frees each key on some platforms, - // so give it something to free: - char *ret = strdup(s); - if (!ret) err(EXIT_FAILURE, "strdup"); - return ret; -// AFAIK there's no way to detect musl, assume non-glibc Linux is musl: -#elif defined(__GLIBC__) || defined(__linux__) || \ - defined(__FreeBSD__) || defined(__NetBSD__) - // do nothing on these platforms -#else -#warning untested platform detected, unsure if hdestroy(3) frees keys -#warning contact us at meta@public-inbox.org if you get segfaults -#endif - return s; -} - // for test usage only, we need to ensure the compiler supports // __cleanup__ when exceptions are thrown struct inspect { struct req *req; }; @@ -406,6 +445,11 @@ static bool cmd_test_inspect(struct req *req) return false; } +static bool cmd_test_sleep(struct req *req) +{ + for (;;) poll(NULL, 0, 10); + return false; +} #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff #include "xh_cidx.h" // CodeSearchIdx.pm stuff @@ -420,6 +464,7 @@ static const struct cmd_entry { CMD(dump_ibx), // many inboxes CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last + CMD(test_sleep), // least common commands last }; #define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) @@ -495,15 +540,6 @@ again: return false; } -static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch -{ - const struct srch *a = (const struct srch *)pa; - const struct srch *b = (const struct srch *)pb; - int diff = a->paths_len - b->paths_len; - - return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); -} - static bool is_chert(const char *dir) { char iamchert[PATH_MAX]; @@ -517,49 +553,85 @@ static bool is_chert(const char *dir) return false; } -static bool srch_init(struct req *req) +static void srch_free(struct srch *srch) +{ + delete srch->qp; + delete srch->db; + free(srch); +} + +static void srch_cache_renew(struct srch *keep) +{ + khint_t k; + + // can't delete while iterating, so just free each + clear + for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) { + if (!kh_exist(srch_cache, k)) continue; + struct srch *cur = kh_key(srch_cache, k); + + if (cur != keep) + srch_free(cur); + } + srch_set_cs_clear(srch_cache); + if (keep) { + int absent; + k = srch_set_put(srch_cache, keep, &absent); + assert(absent); + assert(k < kh_end(srch_cache)); + } +} + +static void srch_init(struct req *req) { - char *dirv[MY_ARG_MAX]; int i; struct srch *srch = req->srch; - int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; - srch->qp_flags = FLAG_PHRASE | - Xapian::QueryParser::FLAG_BOOLEAN | + srch->qp_flags = Xapian::QueryParser::FLAG_BOOLEAN | Xapian::QueryParser::FLAG_LOVEHATE | Xapian::QueryParser::FLAG_WILDCARD; - if (is_chert(dirv[0])) - srch->qp_flags &= ~FLAG_PHRASE; - try { - srch->db = new Xapian::Database(dirv[0]); - } catch (...) { - warn("E: Xapian::Database(%s)", dirv[0]); - return false; + long nfd = req->dirc * SHARD_COST; + + shard_nfd += nfd; + if (shard_nfd > my_fd_max) { + srch_cache_renew(srch); + shard_nfd = nfd; } - try { - for (i = 1; i < dirc; i++) { - if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + for (int retried = 0; retried < 2; retried++) { + srch->qp_flags |= FLAG_PHRASE; + i = 0; + try { + srch->db = new Xapian::Database(req->dirv[i]); + if (is_chert(req->dirv[0])) srch->qp_flags &= ~FLAG_PHRASE; - srch->db->add_database(Xapian::Database(dirv[i])); + for (i = 1; i < req->dirc; i++) { + const char *dir = req->dirv[i]; + if (srch->qp_flags & FLAG_PHRASE && + is_chert(dir)) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dir)); + } + break; + } catch (const Xapian::Error & e) { + warnx("E: Xapian::Error: %s (%s)", + e.get_description().c_str(), req->dirv[i]); + } catch (...) { // does this happen? + warn("E: add_database(%s)", req->dirv[i]); + } + if (retried) { + errx(EXIT_FAILURE, "E: can't open %s", req->dirv[i]); + } else { + warnx("retrying..."); + if (srch->db) + delete srch->db; + srch->db = NULL; + srch_cache_renew(srch); } - } catch (...) { - warn("E: add_database(%s)", dirv[i]); - return false; - } - try { - srch->qp = new Xapian::QueryParser; - } catch (...) { - perror("E: Xapian::QueryParser"); - return false; } + // these will raise and die on ENOMEM or other errors + srch->qp = new Xapian::QueryParser; srch->qp->set_default_op(Xapian::Query::OP_AND); srch->qp->set_database(*srch->db); - try { - srch->qp->set_stemmer(Xapian::Stem("english")); - } catch (...) { - perror("E: Xapian::Stem"); - return false; - } + srch->qp->set_stemmer(Xapian::Stem("english")); srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); srch->qp->SET_MAX_EXPANSION(100); @@ -567,15 +639,31 @@ static bool srch_init(struct req *req) qp_init_code_search(srch->qp); // CodeSearch.pm else qp_init_mail_search(srch->qp); // Search.pm - return true; } -static void free_srch(void *p) // tdestroy +// setup query parser for altid and arbitrary headers +static void srch_init_extra(struct req *req) { - struct srch *srch = (struct srch *)p; - delete srch->qp; - delete srch->db; - free(srch); + const char *XPFX; + for (int i = 0; i < req->qpfxc; i++) { + size_t len = strlen(req->qpfxv[i]); + char *c = (char *)memchr(req->qpfxv[i], '=', len); + + if (c) { // it's boolean "gmane=XGMANE" + XPFX = c + 1; + *c = 0; + req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX); + continue; + } + // maybe it's a non-boolean prefix "blob:XBLOBID" + c = (char *)memchr(req->qpfxv[i], ':', len); + if (!c) + errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]); + XPFX = c + 1; + *c = 0; + req->srch->qp->add_prefix(req->qpfxv[i], XPFX); + } + req->srch->qp_extra_done = true; } static void dispatch(struct req *req) @@ -588,7 +676,6 @@ static void dispatch(struct req *req) } kbuf; char *end; FILE *kfp; - struct srch **s; req->threadid = ULLONG_MAX; for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { if (cmds[c].fn_len == size && @@ -602,7 +689,7 @@ static void dispatch(struct req *req) kfp = open_memstream(&kbuf.ptr, &size); if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)"); // write padding, first (contents don't matter) - fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + fwrite(&req->argv[0], offsetof(struct srch, ckey), 1, kfp); // global getopt variables: optopt = 0; @@ -614,7 +701,11 @@ static void dispatch(struct req *req) switch (c) { case 'a': req->asc = true; break; case 'c': req->code_search = true; break; - case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'd': + req->dirv[req->dirc++] = optarg; + if (MY_ARG_MAX == req->dirc) ABORT("too many -d"); + fprintf(kfp, "-d%c%s%c", 0, optarg, 0); + break; case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix case 'k': req->sort_col = strtol(optarg, &end, 10); @@ -633,7 +724,6 @@ static void dispatch(struct req *req) if (*end || req->off == ULLONG_MAX) ABORT("-o %s", optarg); break; - case 'p': req->emit_percent = true; break; case 'r': req->relevance = true; break; case 't': req->collapse_threads = true; break; case 'A': @@ -641,7 +731,6 @@ static void dispatch(struct req *req) if (MY_ARG_MAX == req->pfxc) ABORT("too many -A"); break; - case 'D': req->emit_docdata = true; break; case 'K': req->timeout_sec = strtoul(optarg, &end, 10); if (*end || req->timeout_sec == ULONG_MAX) @@ -653,28 +742,38 @@ static void dispatch(struct req *req) if (*end || req->threadid == ULLONG_MAX) ABORT("-T %s", optarg); break; + case 'Q': + req->qpfxv[req->qpfxc++] = optarg; + if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q"); + fprintf(kfp, "-Q%c%s%c", 0, optarg, 0); + break; default: ABORT("bad switch `-%c'", c); } } ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch kbuf.srch->db = NULL; kbuf.srch->qp = NULL; - kbuf.srch->paths_len = size - offsetof(struct srch, paths); - if (kbuf.srch->paths_len <= 0) - ABORT("no -d args"); - s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp); - if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM - req->srch = *s; - if (req->srch != kbuf.srch) { // reuse existing - free_srch(kbuf.srch); - } else if (!srch_init(req)) { - assert(kbuf.srch == *((struct srch **)tfind( - kbuf.srch, &srch_tree, srch_cmp))); - void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp); - assert(del); - free_srch(kbuf.srch); - goto cmd_err; // srch_init already warned + kbuf.srch->qp_extra_done = false; + kbuf.srch->ckey_len = size - offsetof(struct srch, ckey); + if (kbuf.srch->ckey_len <= 0 || !req->dirc) + ABORT("no -d args (or too many)"); + + int absent; + khint_t ki = srch_set_put(srch_cache, kbuf.srch, &absent); + assert(ki < kh_end(srch_cache)); + req->srch = kh_key(srch_cache, ki); + if (absent) { + srch_init(req); + } else { + assert(req->srch != kbuf.srch); + srch_free(kbuf.srch); + req->srch->db->reopen(); } + if (req->qpfxc && !req->srch->qp_extra_done) + srch_init_extra(req); + if (req->timeout_sec) + alarm(req->timeout_sec > UINT_MAX ? + UINT_MAX : (unsigned)req->timeout_sec); try { if (!req->fn(req)) warnx("`%s' failed", req->argv[0]); @@ -683,8 +782,8 @@ static void dispatch(struct req *req) } catch (...) { warn("unhandled exception"); } -cmd_err: - return; // just be silent on errors, for now + if (req->timeout_sec) + alarm(0); } static void cleanup_pids(void) @@ -723,9 +822,12 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } -static void sigw(int sig) // SIGTERM handler for worker +static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker { - sock_fd = -1; // break out of recv_loop + switch (sig) { + case SIGUSR1: worker_needs_reopen = 1; break; + default: sock_fd = -1; // break out of recv_loop + } } #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup))) @@ -735,6 +837,18 @@ static void req_cleanup(void *ptr) free(req->lenv); } +static void reopen_logs(void) +{ + if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout)) + err(EXIT_FAILURE, "freopen %s", stdout_path); + if (stderr_path && *stderr_path) { + if (!freopen(stderr_path, "a", stderr)) + err(EXIT_FAILURE, "freopen %s", stderr_path); + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + } +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process @@ -742,6 +856,7 @@ static void recv_loop(void) // worker process loop sa.sa_handler = sigw; CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); while (sock_fd == 0) { size_t len = sizeof(rbuf); @@ -758,6 +873,10 @@ static void recv_loop(void) // worker process loop stderr_restore(req.fp[1]); ERR_CLOSE(req.fp[1], 0); } + if (worker_needs_reopen) { + worker_needs_reopen = 0; + reopen_logs(); + } } } @@ -804,10 +923,21 @@ static void start_workers(void) static void cleanup_all(void) { cleanup_pids(); -#ifdef __GLIBC__ - tdestroy(srch_tree, free_srch); - srch_tree = NULL; -#endif + if (!srch_cache) + return; + srch_cache_renew(NULL); + srch_set_destroy(srch_cache); + srch_cache = NULL; +} + +static void parent_reopen_logs(void) +{ + reopen_logs(); + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGUSR1)) + warn("BUG?: kill(%d, SIGUSR1)", (int)pid); + } } static void sigp(int sig) // parent signal handler @@ -822,6 +952,7 @@ static void sigp(int sig) // parent signal handler case SIGCHLD: c = '.'; break; case SIGTTOU: c = '-'; break; case SIGTTIN: c = '+'; break; + case SIGUSR1: c = '#'; break; default: write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); _exit(EXIT_FAILURE); @@ -928,14 +1059,25 @@ int main(int argc, char *argv[]) { int c; socklen_t slen = (socklen_t)sizeof(c); + stdout_path = getenv("STDOUT_PATH"); + stderr_path = getenv("STDERR_PATH"); + struct rlimit rl; if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); if (c != SOCK_SEQPACKET) errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); + if (getrlimit(RLIMIT_NOFILE, &rl)) + err(EXIT_FAILURE, "getrlimit"); + my_fd_max = rl.rlim_cur; + if (my_fd_max < 72) + warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); + my_fd_max -= 64; + mail_nrp_init(); code_nrp_init(); + srch_cache = srch_set_init(); atexit(cleanup_all); if (!STDERR_ASSIGNABLE) { @@ -945,12 +1087,6 @@ int main(int argc, char *argv[]) } nworker = 1; -#ifdef _SC_NPROCESSORS_ONLN - long j = sysconf(_SC_NPROCESSORS_ONLN); - if (j > 0) - nworker = j > WORKER_MAX ? WORKER_MAX : j; -#endif // _SC_NPROCESSORS_ONLN - // make warn/warnx/err multi-process friendly: if (my_setlinebuf(stderr)) err(EXIT_FAILURE, "setlinebuf(stderr)"); @@ -992,6 +1128,8 @@ int main(int argc, char *argv[]) DELSET(SIGXCPU); DELSET(SIGXFSZ); #undef DELSET + CHECK(int, 0, sigdelset(&workerset, SIGUSR1)); + CHECK(int, 0, sigdelset(&fullset, SIGALRM)); if (nworker == 0) { // no SIGTERM handling w/o workers recv_loop(); @@ -1000,8 +1138,7 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&workerset, SIGTERM)); CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); nworker_hwm = nworker; - worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); - if (!worker_pids) err(EXIT_FAILURE, "calloc"); + worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t)); if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); int fl = fcntl(pipefds[1], F_GETFL); @@ -1012,10 +1149,12 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&pset, SIGCHLD)); CHECK(int, 0, sigdelset(&pset, SIGTTIN)); CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + CHECK(int, 0, sigdelset(&pset, SIGUSR1)); struct sigaction sa = {}; sa.sa_handler = sigp; + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); sa.sa_flags = SA_NOCLDSTOP; @@ -1040,6 +1179,7 @@ int main(int argc, char *argv[]) case '.': break; // do_sigchld already called case '-': do_sigttou(); break; case '+': do_sigttin(); break; + case '#': parent_reopen_logs(); break; default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); } } diff --git a/lib/PublicInbox/xh_cidx.h b/lib/PublicInbox/xh_cidx.h index 311ca05f..8cc6a845 100644 --- a/lib/PublicInbox/xh_cidx.h +++ b/lib/PublicInbox/xh_cidx.h @@ -3,11 +3,38 @@ // This file is only intended to be included by xap_helper.h // it implements pieces used by CodeSearchIdx.pm +// TODO: consider making PublicInbox::CodeSearchIdx emit binary +// (20 or 32-bit) OIDs instead of ASCII hex. It would require +// more code in both Perl and C++, though... + +// assumes trusted data from same host +static inline unsigned int hex2uint(char c) +{ + switch (c) { + case '0' ... '9': return c - '0'; + case 'a' ... 'f': return c - 'a' + 10; + default: return 0xff; // oh well... + } +} + +// assumes trusted data from same host +static kh_inline khint_t sha_hex_hash(const char *hex) +{ + khint_t ret = 0; + + for (size_t shift = 32; shift; ) + ret |= hex2uint(*hex++) << (shift -= 4); + + return ret; +} + +KHASHL_CMAP_INIT(KH_LOCAL, root2id_map, root2id, + const char *, const char *, + sha_hex_hash, kh_eq_str) + static void term_length_extract(struct req *req) { - req->lenv = (size_t *)calloc(req->pfxc, sizeof(size_t)); - if (!req->lenv) - EABORT("lenv = calloc(%d %zu)", req->pfxc, sizeof(size_t)); + req->lenv = (size_t *)xcalloc(req->pfxc, sizeof(size_t)); for (int i = 0; i < req->pfxc; i++) { char *pfx = req->pfxv[i]; // extract trailing digits as length: @@ -101,6 +128,7 @@ struct dump_roots_tmp { void *mm_ptr; char **entries; struct fbuf wbuf; + root2id_map *root2id; int root2off_fd; }; @@ -110,7 +138,8 @@ static void dump_roots_ensure(void *ptr) struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr; if (drt->root2off_fd >= 0) xclose(drt->root2off_fd); - hdestroy(); // idempotent + if (drt->root2id) + root2id_cm_destroy(drt->root2id); size_t size = off2size(drt->sb.st_size); if (drt->mm_ptr && munmap(drt->mm_ptr, size)) EABORT("BUG: munmap(%p, %zu)", drt->mm_ptr, size); @@ -118,23 +147,21 @@ static void dump_roots_ensure(void *ptr) fbuf_ensure(&drt->wbuf); } -static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc) +static bool root2offs_str(struct dump_roots_tmp *drt, + struct fbuf *root_offs, Xapian::Document *doc) { Xapian::TermIterator cur = doc->termlist_begin(); Xapian::TermIterator end = doc->termlist_end(); - ENTRY e, *ep; fbuf_init(root_offs); for (cur.skip_to("G"); cur != end; cur++) { std::string tn = *cur; if (!starts_with(&tn, "G", 1)) break; - union { const char *in; char *out; } u; - u.in = tn.c_str() + 1; - e.key = u.out; - ep = hsearch(e, FIND); - if (!ep) ABORT("hsearch miss `%s'", e.key); - // ep->data is a NUL-terminated string matching /[0-9]+/ + khint_t i = root2id_get(drt->root2id, tn.c_str() + 1); + if (i >= kh_end(drt->root2id)) + ABORT("kh get miss `%s'", tn.c_str() + 1); fputc(' ', root_offs->fp); - fputs((const char *)ep->data, root_offs->fp); + // kh_val(...) is a NUL-terminated string matching /[0-9]+/ + fputs(kh_val(drt->root2id, i), root_offs->fp); } fputc('\n', root_offs->fp); ERR_CLOSE(root_offs->fp, EXIT_FAILURE); // ENOMEM @@ -198,7 +225,7 @@ static enum exc_iter dump_roots_iter(struct req *req, CLEANUP_FBUF struct fbuf root_offs = {}; // " $ID0 $ID1 $IDx..\n" try { Xapian::Document doc = i->get_document(); - if (!root2offs_str(&root_offs, &doc)) + if (!root2offs_str(drt, &root_offs, &doc)) return ITER_ABORT; // bad request, abort for (int p = 0; p < req->pfxc; p++) dump_roots_term(req, p, drt, &root_offs, &doc); @@ -226,8 +253,7 @@ static bool cmd_dump_roots(struct req *req) if (fstat(drt.root2off_fd, &drt.sb)) // ENOMEM? err(EXIT_FAILURE, "fstat(%s)", root2off_file); // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0), - // so /32 overestimates the number of expected entries by - // ~%25 (as recommended by Linux hcreate(3) manpage) + // so /32 overestimates the number of expected entries size_t size = off2size(drt.sb.st_size); size_t est = (size / 32) + 1; //+1 for "\0" termination drt.mm_ptr = mmap(NULL, size, PROT_READ, @@ -236,20 +262,19 @@ static bool cmd_dump_roots(struct req *req) err(EXIT_FAILURE, "mmap(%zu, %s)", size, root2off_file); size_t asize = est * 2; if (asize < est) ABORT("too many entries: %zu", est); - drt.entries = (char **)calloc(asize, sizeof(char *)); - if (!drt.entries) - err(EXIT_FAILURE, "calloc(%zu * 2, %zu)", est, sizeof(char *)); + drt.entries = (char **)xcalloc(asize, sizeof(char *)); size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr, size, asize); if (tot <= 0) return false; // split2argv already warned on error - if (!hcreate(est)) - err(EXIT_FAILURE, "hcreate(%zu)", est); + drt.root2id = root2id_init(); + root2id_cm_resize(drt.root2id, est); for (size_t i = 0; i < tot; ) { - ENTRY e; - e.key = hsearch_enter_key(drt.entries[i++]); // dies on ENOMEM - e.data = drt.entries[i++]; - if (!hsearch(e, ENTER)) - err(EXIT_FAILURE, "hsearch(%s => %s, ENTER)", e.key, - (const char *)e.data); + int absent; + const char *key = drt.entries[i++]; + khint_t k = root2id_put(drt.root2id, key, &absent); + if (!absent) + err(EXIT_FAILURE, "put(%s => %s, ENTER)", + key, drt.entries[i]); + kh_val(drt.root2id, k) = drt.entries[i++]; } req->asc = true; req->sort_col = -1; diff --git a/lib/PublicInbox/xh_mset.h b/lib/PublicInbox/xh_mset.h index 4e97a284..db2692c9 100644 --- a/lib/PublicInbox/xh_mset.h +++ b/lib/PublicInbox/xh_mset.h @@ -3,49 +3,6 @@ // This file is only intended to be included by xap_helper.h // it implements pieces used by WWW, IMAP and lei -static void emit_doc_term(FILE *fp, const char *pfx, Xapian::Document *doc) -{ - Xapian::TermIterator cur = doc->termlist_begin(); - Xapian::TermIterator end = doc->termlist_end(); - size_t pfx_len = strlen(pfx); - - for (cur.skip_to(pfx); cur != end; cur++) { - std::string tn = *cur; - if (!starts_with(&tn, pfx, pfx_len)) break; - fputc(0, fp); - fwrite(tn.data(), tn.size(), 1, fp); - } -} - -static enum exc_iter mset_iter(const struct req *req, FILE *fp, off_t off, - Xapian::MSetIterator *i) -{ - try { - fprintf(fp, "%llu", (unsigned long long)(*(*i))); // get_docid - if (req->emit_percent) - fprintf(fp, "%c%d", 0, i->get_percent()); - if (req->pfxc || req->emit_docdata) { - Xapian::Document doc = i->get_document(); - for (int p = 0; p < req->pfxc; p++) - emit_doc_term(fp, req->pfxv[p], &doc); - if (req->emit_docdata) { - std::string d = doc.get_data(); - fputc(0, fp); - fwrite(d.data(), d.size(), 1, fp); - } - } - fputc('\n', fp); - } catch (const Xapian::DatabaseModifiedError & e) { - req->srch->db->reopen(); - if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko"); - return ITER_RETRY; - } catch (const Xapian::DocNotFoundError & e) { // oh well... - warnx("doc not found: %s", e.get_description().c_str()); - if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko"); - } - return ITER_OK; -} - #ifndef WBUF_FLUSH_THRESHOLD # define WBUF_FLUSH_THRESHOLD (BUFSIZ - 1000) #endif @@ -63,7 +20,9 @@ static bool cmd_mset(struct req *req) Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) : mail_mset(req, qry_str); fbuf_init(&wbuf); - fprintf(wbuf.fp, "mset.size=%llu\n", (unsigned long long)mset.size()); + fprintf(wbuf.fp, "mset.size=%llu .get_matches_estimated=%llu\n", + (unsigned long long)mset.size(), + (unsigned long long)mset.get_matches_estimated()); int fd = fileno(req->fp[0]); for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { off_t off = ftello(wbuf.fp); @@ -82,12 +41,10 @@ static bool cmd_mset(struct req *req) if (fseeko(wbuf.fp, 0, SEEK_SET)) EABORT("fseeko"); off = 0; } - for (int t = 10; t > 0; --t) - switch (mset_iter(req, wbuf.fp, off, &i)) { - case ITER_OK: t = 0; break; // leave inner loop - case ITER_RETRY: break; // continue for-loop - case ITER_ABORT: return false; // error - } + fprintf(wbuf.fp, "%llu" "%c" "%d" "%c" "%llu\n", + (unsigned long long)(*i), // get_docid + 0, i.get_percent(), + 0, (unsigned long long)i.get_rank()); } off_t off = ftello(wbuf.fp); if (off < 0) EABORT("ftello"); |