about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-05-23 09:37:03 +0000
committerEric Wong <e@80x24.org>2019-05-23 17:43:51 +0000
commit4a918d9c17b045ad8151c86b09666799c354908f (patch)
tree99411ea0786083371ecaf8f95aa51324258c471d
parentea3528394f0ee5c47f9b94a4d92d129c709990df (diff)
downloadpublic-inbox-4a918d9c17b045ad8151c86b09666799c354908f.tar.gz
We don't have to be tied to the number of partitions in case
we made a bad choice at initialization.  This doesn't affect
reindexing, but the copying phase is already intensive.

And optimize away the extra process when we only have a single
job which won't parallelize.

The wording for the (v2) reindexing phase could be improved,
later.  I also plan to allow repartitioning of existing
Xapian DBs.
-rw-r--r--lib/PublicInbox/Xapcmd.pm44
1 files changed, 28 insertions, 16 deletions
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index 5b6d06b8..a294d539 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -13,7 +13,7 @@ use File::Basename qw(dirname);
 # support testing with dev versions of Xapian which installs
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
-our @COMPACT_OPT = qw(quiet|q blocksize|b=s no-full|n fuller|F);
+our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 
 sub commit_changes ($$$) {
         my ($ibx, $tmp, $opt) = @_;
@@ -54,8 +54,7 @@ sub cb_spawn {
         my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
         defined(my $pid = fork) or die "fork: $!";
         return $pid if $pid > 0;
-        eval { $cb->($args, $opt) };
-        die $@ if $@;
+        $cb->($args, $opt);
         exit 0;
 }
 
@@ -103,6 +102,31 @@ sub same_fs_or_die ($$) {
         die "$x and $y reside on different filesystems\n";
 }
 
+sub process_queue {
+        my ($queue, $cb, $max, $opt) = @_;
+        if ($max <= 1) {
+                while (defined(my $args = shift @$queue)) {
+                        $cb->($args, $opt);
+                }
+                return;
+        }
+
+        # run in parallel:
+        my %pids;
+        while (@$queue) {
+                while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
+                        my $args = shift @$queue;
+                        $pids{cb_spawn($cb, $args, $opt)} = $args;
+                }
+
+                while (scalar keys %pids) {
+                        my $pid = waitpid(-1, 0);
+                        my $args = delete $pids{$pid};
+                        die join(' ', @$args)." failed: $?\n" if $?;
+                }
+        }
+}
+
 sub run {
         my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
         my $cb = \&${\"PublicInbox::Xapcmd::$task"};
@@ -163,19 +187,7 @@ sub run {
                 }
 
                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
-                my %pids;
-                while (@q) {
-                        while (scalar(keys(%pids)) < $max && scalar(@q)) {
-                                my $args = shift @q;
-                                $pids{cb_spawn($cb, $args, $opt)} = $args;
-                        }
-
-                        while (scalar keys %pids) {
-                                my $pid = waitpid(-1, 0);
-                                my $args = delete $pids{$pid};
-                                die join(' ', @$args)." failed: $?\n" if $?;
-                        }
-                }
+                process_queue(\@q, $cb, $max, $opt);
                 commit_changes($ibx, $tmp, $opt);
         });
 }