about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-01-30 04:30:57 +0000
committerEric Wong <e@80x24.org>2023-01-30 06:42:42 +0000
commit7b654d175cf2e31b4354929ea678563f534947e5 (patch)
tree3dcfb9dfc38e005c5aeeb6e673968610e6393e91
parentf9557e2c6b7510d278310066fe7c26f84cae6e1d (diff)
downloadpublic-inbox-7b654d175cf2e31b4354929ea678563f534947e5.tar.gz
This brings t/lei-index.t back down from ~8 to ~3s.  I didn't
notice this before was because the LeiNoteEvent timer was firing
every 5s and clearing circular refs and parallel testing meant
the delay got hidden.

Fixes: 4a2a95bbc78f99c8 (ipc+lei: switch to awaitpid, 2023-01-17)
-rw-r--r--lib/PublicInbox/IPC.pm32
-rw-r--r--lib/PublicInbox/LEI.pm6
-rw-r--r--lib/PublicInbox/LeiMirror.pm2
-rw-r--r--lib/PublicInbox/LeiStore.pm7
-rw-r--r--lib/PublicInbox/LeiToMail.pm6
-rw-r--r--lib/PublicInbox/LeiUp.pm2
-rw-r--r--lib/PublicInbox/LeiXSearch.pm10
7 files changed, 28 insertions, 37 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index edc5ba64..548a72eb 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -96,7 +96,7 @@ sub ipc_worker_loop ($$$) {
 
 # starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
-        my ($self, $ident, $oldset, $fields) = @_;
+        my ($self, $ident, $oldset, $fields, @cb_args) = @_;
         return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
         delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
         pipe(my ($r_req, $w_req)) or die "pipe: $!";
@@ -133,28 +133,20 @@ sub ipc_worker_spawn {
         $self->{-ipc_req} = $w_req;
         $self->{-ipc_res} = $r_res;
         $self->{-ipc_ppid} = $$;
-        awaitpid($pid, \&ipc_worker_reap, $self);
+        awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
         $self->{-ipc_pid} = $pid;
 }
 
 sub ipc_worker_reap { # awaitpid callback
-        my ($pid, $self) = @_;
+        my ($pid, $self, $cb, @args) = @_;
         delete $self->{-wq_workers}->{$pid};
-        if (my $cb_args = $self->{-reap_do}) {
-                return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
-        }
+        return $cb->($pid, $self, @args) if $cb;
         return if !$?;
         my $s = $? & 127;
         # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
         warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-# register wait workers
-sub awaitpid_init {
-        my ($self, @cb_args) = @_;
-        $self->{-reap_do} = \@cb_args;
-}
-
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
@@ -347,7 +339,6 @@ sub wq_do {
 
 sub prepare_nonblock {
         ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
-        $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
         require PublicInbox::WQBlocked;
 }
 
@@ -363,8 +354,8 @@ sub wq_nonblock_do { # always async
         }
 }
 
-sub _wq_worker_start ($$$$) {
-        my ($self, $oldset, $fields, $one) = @_;
+sub _wq_worker_start {
+        my ($self, $oldset, $fields, $one, @cb_args) = @_;
         my ($bcast1, $bcast2);
         $one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
                                                         die "socketpair: $!";
@@ -395,13 +386,13 @@ sub _wq_worker_start ($$$$) {
                 undef $end; # trigger exit
         } else {
                 $self->{-wq_workers}->{$pid} = $bcast1;
-                awaitpid($pid, \&ipc_worker_reap, $self);
+                awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
         }
 }
 
 # starts workqueue workers if Sereal or Storable is installed
 sub wq_workers_start {
-        my ($self, $ident, $nr_workers, $oldset, $fields) = @_;
+        my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
         ($send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
         return if $self->{-wq_s1}; # idempotent
         $self->{-wq_s1} = $self->{-wq_s2} = undef;
@@ -414,7 +405,9 @@ sub wq_workers_start {
         $self->{-wq_ident} = $ident;
         my $one = $nr_workers == 1;
         $self->{-wq_nr_workers} = $nr_workers;
-        _wq_worker_start($self, $sigset, $fields, $one) for (1..$nr_workers);
+        for (1..$nr_workers) {
+                _wq_worker_start($self, $sigset, $fields, $one, @cb_args);
+        }
         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
         $self->{-wq_ppid} = $$;
 }
@@ -422,11 +415,10 @@ sub wq_workers_start {
 sub wq_close {
         my ($self) = @_;
         if (my $wqb = delete $self->{wqb}) {
-                $self->{-reap_do} or die 'BUG: {-reap_do} unset';
                 $wqb->enq_close;
         }
         delete @$self{qw(-wq_s1 -wq_s2)} or return;
-        return if $self->{-reap_do};
+        return if ($self->{-wq_ppid} // -1) != $$;
         awaitpid($_) for keys %{$self->{-wq_workers}};
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 6ad42111..ffd50db5 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -643,8 +643,8 @@ sub workers_start {
         my $end = $lei->pkt_op_pair;
         my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
         $flds->{lei} = $lei;
-        $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
-        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+                $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         delete $lei->{pkt_op_p};
         my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
@@ -1390,7 +1390,7 @@ sub DESTROY {
         # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub wq_done_wait { # awaitpid cb (via wq_eof)
         my ($pid, $wq, $lei) = @_;
         local $current_lei = $lei;
         my $err_type = $lei->{-err_type};
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 31013360..ec2b56df 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub keep_going ($) {
                 $_[0]->{lei}->{opt}->{'keep-going'});
 }
 
-sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub _wq_done_wait { # awaitpid cb (via wq_eof)
         my ($pid, $mrr, $lei) = @_;
         if ($?) {
                 $lei->child_error($?);
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0ecf1388..fce15a72 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -604,8 +604,8 @@ sub recv_and_run {
         $self->SUPER::recv_and_run(@args);
 }
 
-sub _sto_atexit { # awaitpid cb (via awaitpid_init)
-        my ($pid, $sto) = @_;
+sub _sto_atexit { # awaitpid cb
+        my ($pid) = @_;
         warn "lei/store PID:$pid died \$?=$?\n" if $?;
 }
 
@@ -620,12 +620,11 @@ sub write_prepare {
                 # Mail we import into lei are private, so headers filtered out
                 # by -mda for public mail are not appropriate
                 local @PublicInbox::MDA::BAD_HEADERS = ();
-                $self->awaitpid_init(\&_sto_atexit); # outlives $lei
                 $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
                                         lei => $lei,
                                         -err_wr => $w,
                                         to_close => [ $r ],
-                                });
+                                }, \&_sto_atexit);
                 require PublicInbox::LeiStoreErr;
                 PublicInbox::LeiStoreErr->new($r, $lei);
         }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6a4554e7..31eba794 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -652,7 +652,7 @@ sub _do_augment_mbox {
         $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+sub v2w_done_wait { # awaitpid cb
         my ($pid, $v2w, $lei) = @_;
         $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
 }
@@ -679,8 +679,8 @@ sub _pre_augment_v2 {
         PublicInbox::InboxWritable->new($ibx, @creat);
         $ibx->init_inbox if @creat;
         my $v2w = $ibx->importer;
-        $v2w->awaitpid_init(\&v2w_done_wait, $lei);
-        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
+                                \&v2w_done_wait, $lei);
         $lei->{v2w} = $v2w;
         return if !$lei->{opt}->{shared};
         my $d = "$lei->{ale}->{git}->{git_dir}/objects";
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 3e92242e..cd2337b4 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -165,7 +165,7 @@ sub _complete_up { # lei__complete hook
         map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
-sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+sub _wq_done_wait { # awaitpid cb
         my ($pid, $wq, $lei) = @_;
         $lei->child_error($?, 'auth failure') if $?
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index f9aa870e..5965274c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -400,7 +400,7 @@ sub query_remote_mboxrd {
 
 sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
 
-sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+sub xsearch_done_wait { # awaitpid cb
         my ($pid, $wq, $lei) = @_;
         return if !$?;
         my $s = $? & 127;
@@ -572,16 +572,16 @@ sub do_query {
                         fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                         $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
                 }
-                $l2m->awaitpid_init(\&xsearch_done_wait, $lei);
                 $l2m->wq_workers_start('lei2mail', undef,
-                                        $lei->oldset, { lei => $lei });
+                                        $lei->oldset, { lei => $lei },
+                                        \&xsearch_done_wait, $lei);
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                 fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                 delete $l2m->{au_peers};
         }
-        $self->awaitpid_init(\&xsearch_done_wait, $lei);
         $self->wq_workers_start('lei_xsearch', undef,
-                                $lei->oldset, { lei => $lei });
+                                $lei->oldset, { lei => $lei },
+                                \&xsearch_done_wait, $lei);
         my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();