diff options
-rw-r--r-- | lib/PublicInbox/IPC.pm | 5 | ||||
-rw-r--r-- | lib/PublicInbox/LeiQuery.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 4 |
3 files changed, 9 insertions, 6 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 2aeb6462..1fa67d00 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -341,7 +341,7 @@ sub wq_workers_start { socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; $self->ipc_atfork_prepare; - $nr_workers //= 4; + $nr_workers //= $self->{-wq_nr_workers}; $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; @@ -354,6 +354,7 @@ sub wq_workers_start { sub wq_worker_incr { # SIGTTIN handler my ($self, $oldset, $fields) = @_; $self->{-wq_s2} or return; + die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers}; return if wq_workers($self) >= $WQ_MAX_WORKERS; $self->ipc_atfork_prepare; my $sigset = $oldset // PublicInbox::DS::block_signals(); @@ -369,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; return unless wq_workers($self); + die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers}; $self->wq_io_do('wq_exit'); # caller must call wq_worker_decr_wait in main loop } @@ -376,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker sub wq_worker_decr_wait { my ($self, $timeout, $cb, @args) = @_; return if $self->{-wq_ppid} != $$; # can't reap siblings or parents + die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers}; my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1'; vec(my $rin = '', fileno($s1), 1) = 1; select(my $rout = $rin, undef, undef, $timeout) or diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index eaf91f2e..398f834f 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -90,14 +90,14 @@ sub lei_q { return $self->fail("`$xj' search jobs must be >= 1"); } $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY" - my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists + my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists $xj = $nproc if $xj > $nproc; - $lxs->{jobs} = $xj; + $lxs->{-wq_nr_workers} = $xj; if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { return $self->fail("`$mj' writer jobs must be >= 1"); } - $self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m}; PublicInbox::LeiOverview->new($self) or return; + $self->{l2m}->{-wq_nr_workers} = ($mj // $nproc) if $self->{l2m}; my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset); $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index a319b75f..524f4d1c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -407,7 +407,7 @@ sub do_query { if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { $lei->start_mua; } - $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, + $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; # 1031: F_SETPIPE_SZ @@ -418,7 +418,7 @@ sub do_query { # delete until all lei2mail + lei_xsearch workers are reaped $lei->{git_tmp} = $self->{git_tmp} = git_tmp($self); } - $self->wq_workers_start('lei_xsearch', $self->{jobs}, + $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; |