From 7b654d175cf2e31b4354929ea678563f534947e5 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 30 Jan 2023 04:30:57 +0000 Subject: ipc: drop awaitpid_init to avoid circular refs This brings t/lei-index.t back down from ~8 to ~3s. I didn't notice this before was because the LeiNoteEvent timer was firing every 5s and clearing circular refs and parallel testing meant the delay got hidden. Fixes: 4a2a95bbc78f99c8 (ipc+lei: switch to awaitpid, 2023-01-17) --- lib/PublicInbox/IPC.pm | 32 ++++++++++++-------------------- lib/PublicInbox/LEI.pm | 6 +++--- lib/PublicInbox/LeiMirror.pm | 2 +- lib/PublicInbox/LeiStore.pm | 7 +++---- lib/PublicInbox/LeiToMail.pm | 6 +++--- lib/PublicInbox/LeiUp.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 10 +++++----- 7 files changed, 28 insertions(+), 37 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index edc5ba64..548a72eb 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -96,7 +96,7 @@ sub ipc_worker_loop ($$$) { # starts a worker if Sereal or Storable is installed sub ipc_worker_spawn { - my ($self, $ident, $oldset, $fields) = @_; + my ($self, $ident, $oldset, $fields, @cb_args) = @_; return if ($self->{-ipc_ppid} // -1) == $$; # idempotent delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)}); pipe(my ($r_req, $w_req)) or die "pipe: $!"; @@ -133,28 +133,20 @@ sub ipc_worker_spawn { $self->{-ipc_req} = $w_req; $self->{-ipc_res} = $r_res; $self->{-ipc_ppid} = $$; - awaitpid($pid, \&ipc_worker_reap, $self); + awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); $self->{-ipc_pid} = $pid; } sub ipc_worker_reap { # awaitpid callback - my ($pid, $self) = @_; + my ($pid, $self, $cb, @args) = @_; delete $self->{-wq_workers}->{$pid}; - if (my $cb_args = $self->{-reap_do}) { - return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]); - } + return $cb->($pid, $self, @args) if $cb; return if !$?; my $s = $? & 127; # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13 } -# register wait workers -sub awaitpid_init { - my ($self, @cb_args) = @_; - $self->{-reap_do} = \@cb_args; -} - # for base class, override in sub classes sub ipc_atfork_prepare {} @@ -347,7 +339,6 @@ sub wq_do { sub prepare_nonblock { ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0); - $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock'; require PublicInbox::WQBlocked; } @@ -363,8 +354,8 @@ sub wq_nonblock_do { # always async } } -sub _wq_worker_start ($$$$) { - my ($self, $oldset, $fields, $one) = @_; +sub _wq_worker_start { + my ($self, $oldset, $fields, $one, @cb_args) = @_; my ($bcast1, $bcast2); $one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; @@ -395,13 +386,13 @@ sub _wq_worker_start ($$$$) { undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = $bcast1; - awaitpid($pid, \&ipc_worker_reap, $self); + awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); } } # starts workqueue workers if Sereal or Storable is installed sub wq_workers_start { - my ($self, $ident, $nr_workers, $oldset, $fields) = @_; + my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_; ($send_cmd && $recv_cmd && defined($SEQPACKET)) or return; return if $self->{-wq_s1}; # idempotent $self->{-wq_s1} = $self->{-wq_s2} = undef; @@ -414,7 +405,9 @@ sub wq_workers_start { $self->{-wq_ident} = $ident; my $one = $nr_workers == 1; $self->{-wq_nr_workers} = $nr_workers; - _wq_worker_start($self, $sigset, $fields, $one) for (1..$nr_workers); + for (1..$nr_workers) { + _wq_worker_start($self, $sigset, $fields, $one, @cb_args); + } PublicInbox::DS::sig_setmask($sigset) unless $oldset; $self->{-wq_ppid} = $$; } @@ -422,11 +415,10 @@ sub wq_workers_start { sub wq_close { my ($self) = @_; if (my $wqb = delete $self->{wqb}) { - $self->{-reap_do} or die 'BUG: {-reap_do} unset'; $wqb->enq_close; } delete @$self{qw(-wq_s1 -wq_s2)} or return; - return if $self->{-reap_do}; + return if ($self->{-wq_ppid} // -1) != $$; awaitpid($_) for keys %{$self->{-wq_workers}}; } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 6ad42111..ffd50db5 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -643,8 +643,8 @@ sub workers_start { my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; $flds->{lei} = $lei; - $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); - $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds, + $wq->can('_wq_done_wait') // \&wq_done_wait, $lei); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; @$end = (); @@ -1390,7 +1390,7 @@ sub DESTROY { # preserve $? for ->fail or ->x_it code } -sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init) +sub wq_done_wait { # awaitpid cb (via wq_eof) my ($pid, $wq, $lei) = @_; local $current_lei = $lei; my $err_type = $lei->{-err_type}; diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 31013360..ec2b56df 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -31,7 +31,7 @@ sub keep_going ($) { $_[0]->{lei}->{opt}->{'keep-going'}); } -sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init) +sub _wq_done_wait { # awaitpid cb (via wq_eof) my ($pid, $mrr, $lei) = @_; if ($?) { $lei->child_error($?); diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 0ecf1388..fce15a72 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -604,8 +604,8 @@ sub recv_and_run { $self->SUPER::recv_and_run(@args); } -sub _sto_atexit { # awaitpid cb (via awaitpid_init) - my ($pid, $sto) = @_; +sub _sto_atexit { # awaitpid cb + my ($pid) = @_; warn "lei/store PID:$pid died \$?=$?\n" if $?; } @@ -620,12 +620,11 @@ sub write_prepare { # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); - $self->awaitpid_init(\&_sto_atexit); # outlives $lei $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, { lei => $lei, -err_wr => $w, to_close => [ $r ], - }); + }, \&_sto_atexit); require PublicInbox::LeiStoreErr; PublicInbox::LeiStoreErr->new($r, $lei); } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 6a4554e7..31eba794 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -652,7 +652,7 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # awaitpid cb (via awaitpid_init) +sub v2w_done_wait { # awaitpid cb my ($pid, $v2w, $lei) = @_; $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; } @@ -679,8 +679,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->awaitpid_init(\&v2w_done_wait, $lei); - $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}, + \&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index 3e92242e..cd2337b4 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -165,7 +165,7 @@ sub _complete_up { # lei__complete hook map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei); } -sub _wq_done_wait { # awaitpid cb (via awaitpid_init) +sub _wq_done_wait { # awaitpid cb my ($pid, $wq, $lei) = @_; $lei->child_error($?, 'auth failure') if $? } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index f9aa870e..5965274c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -400,7 +400,7 @@ sub query_remote_mboxrd { sub git { $_[0]->{git} // die 'BUG: git uninitialized' } -sub xsearch_done_wait { # awaitpid cb (via awaitpid_init) +sub xsearch_done_wait { # awaitpid cb my ($pid, $wq, $lei) = @_; return if !$?; my $s = $? & 127; @@ -572,16 +572,16 @@ sub do_query { fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ]; } - $l2m->awaitpid_init(\&xsearch_done_wait, $lei); $l2m->wq_workers_start('lei2mail', undef, - $lei->oldset, { lei => $lei }); + $lei->oldset, { lei => $lei }, + \&xsearch_done_wait, $lei); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; } - $self->awaitpid_init(\&xsearch_done_wait, $lei); $self->wq_workers_start('lei_xsearch', undef, - $lei->oldset, { lei => $lei }); + $lei->oldset, { lei => $lei }, + \&xsearch_done_wait, $lei); my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); -- cgit v1.2.3-24-ge0c7