diff options
Diffstat (limited to 'lib/PublicInbox/Mbox.pm')
-rw-r--r-- | lib/PublicInbox/Mbox.pm | 165 |
1 files changed, 91 insertions, 74 deletions
diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm index b977308d..82fba5c6 100644 --- a/lib/PublicInbox/Mbox.pm +++ b/lib/PublicInbox/Mbox.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # Streaming interface for mboxrd HTTP responses @@ -19,13 +19,10 @@ sub getline { my $smsg = $ctx->{smsg} or return; my $ibx = $ctx->{ibx}; my $eml = delete($ctx->{eml}) // $ibx->smsg_eml($smsg) // return; - my $n = $ctx->{smsg} = $ibx->over->next_by_mid(@{$ctx->{next_arg}}); - $ctx->zmore(msg_hdr($ctx, $eml)); - if ($n) { - $ctx->translate(msg_body($eml)); + if (($ctx->{smsg} = $ibx->over->next_by_mid(@{$ctx->{next_arg}}))) { + $ctx->translate(msg_hdr($ctx, $eml), msg_body($eml)); } else { # last message - $ctx->zmore(msg_body($eml)); - $ctx->zflush; + $ctx->zflush(msg_hdr($ctx, $eml), msg_body($eml)); } } @@ -34,8 +31,8 @@ sub async_next { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; # client aborted eval { - my $smsg = $ctx->{smsg} or return $ctx->close; - $ctx->smsg_blob($smsg); + my $smsg = $ctx->{smsg} // return $ctx->close; + $ctx->smsg_blob($smsg) if $smsg; }; warn "E: $@" if $@; } @@ -46,8 +43,7 @@ sub async_eml { # for async_blob_cb # next message $ctx->{smsg} = $ctx->{ibx}->over->next_by_mid(@{$ctx->{next_arg}}); local $ctx->{eml} = $eml; # for mbox_hdr - $ctx->zmore(msg_hdr($ctx, $eml)); - $ctx->write(msg_body($eml)); + $ctx->write(msg_hdr($ctx, $eml), msg_body($eml)); } sub mbox_hdr ($) { @@ -83,7 +79,6 @@ sub no_over_raw ($) { # /$INBOX/$MESSAGE_ID/raw sub emit_raw { my ($ctx) = @_; - $ctx->{base_url} = $ctx->{ibx}->base_url($ctx->{env}); my $over = $ctx->{ibx}->over or return no_over_raw($ctx); my ($id, $prev); my $mip = $ctx->{next_arg} = [ $ctx->{mid}, \$id, \$prev ]; @@ -94,17 +89,15 @@ sub emit_raw { sub msg_hdr ($$) { my ($ctx, $eml) = @_; - my $header_obj = $eml->header_obj; - # drop potentially confusing headers, ssoma already should've dropped - # Lines and Content-Length - foreach my $d (qw(Lines Bytes Content-Length Status)) { - $header_obj->header_set($d); + # drop potentially confusing headers, various importers should've + # already dropped these, but we can't trust stuff we've cloned + for my $d (qw(Lines Bytes Content-Length Status)) { + $eml->header_set($d); } - my $crlf = $header_obj->crlf; - my $buf = $header_obj->as_string; - # fixup old bug from import (pre-a0c07cba0e5d8b6a) - $buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s; + my $crlf = $eml->crlf; + my $buf = $eml->header_obj->as_string; + PublicInbox::Eml::strip_from($buf); "From mboxrd\@z Thu Jan 1 00:00:00 1970" . $crlf . $buf . $crlf; } @@ -166,6 +159,7 @@ sub all_ids_cb { } $ctx->{ids} = $ids = $over->ids_after(\($ctx->{prev})); } while (@$ids); + undef; } sub mbox_all_ids { @@ -182,75 +176,98 @@ sub mbox_all_ids { PublicInbox::MboxGz::mbox_gz($ctx, \&all_ids_cb, 'all'); } -sub results_cb { - my ($ctx) = @_; - my $over = $ctx->{ibx}->over or return $ctx->gone('over'); - while (1) { - while (defined(my $num = shift(@{$ctx->{ids}}))) { - my $smsg = $over->get_art($num) or next; - return $smsg; - } - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; +my $refill_ids_cb = sub { # async_mset cb + my ($ctx, $http, $mset, $err) = @_; + $http = undef unless $ctx->{-really_async}; + if ($err) { + warn "E: $err"; + $ctx->close if $http; # our async httpd + return; } -} - -sub results_thread_cb { - my ($ctx) = @_; + # refill result set, deprioritize since there's many results + my $size = $mset->size or do { + $ctx->close if $http; + $ctx->{-mbox_done} = 1; + return; + }; + $ctx->{qopts}->{offset} += $size; + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + $ctx->{-low_prio} = 1; # true + return if !$http; + eval { + my $smsg = results_cb($ctx) // return $ctx->close; + return if !$smsg; # '' wait for async_mset + $ctx->smsg_blob($ctx->{smsg} = $smsg); + }; + warn "E: $@" if $@; +}; +sub results_cb { # async_next or MboxGz->getline cb + my ($ctx, $http) = @_; my $over = $ctx->{ibx}->over or return $ctx->gone('over'); while (1) { - while (defined(my $num = shift(@{$ctx->{xids}}))) { + my $ids = $ctx->{xids} // $ctx->{ids}; + while (defined(my $num = shift(@$ids))) { my $smsg = $over->get_art($num) or next; return $smsg; } - - # refills ctx->{xids} - next if $over->expand_thread($ctx); - - # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; - $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); - $ctx->{-low_prio} = 1; + next if $ctx->{xids} && $over->expand_thread($ctx); + return '' if $ctx->{srch}->async_mset(@$ctx{qw(query qopts)}, + $refill_ids_cb, $ctx, $http); + return if $ctx->{-mbox_done}; } +} +sub mbox_qry_cb { # async_mset cb + my ($ctx, $q, $mset, $err) = @_; + my $wcb = delete $ctx->{wcb}; + if ($err) { + warn "E: $err"; + return $wcb->([500, [qw(Content-Type text/plain)], + [ "Internal server error\n" ]]) + } + $ctx->{qopts}->{offset} = $mset->size or + return $wcb->([404, [qw(Content-Type text/plain)], + ["No results found\n"]]); + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + my $fn; + if ($q->{t} && $ctx->{srch}->has_threadid) { + $ctx->{xids} = []; # triggers over->expand_thread + $fn = "results-thread-$ctx->{query}"; + } else { + $fn = "results-$ctx->{query}"; + } + require PublicInbox::MboxGz; + my $res = PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } sub mbox_all { my ($ctx, $q) = @_; - my $q_string = $q->{'q'}; - return mbox_all_ids($ctx) if $q_string !~ /\S/; - my $srch = $ctx->{ibx}->isrch or + my $qstr = $q->{'q'}; + return mbox_all_ids($ctx) if $qstr !~ /\S/; + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); - my $over = $ctx->{ibx}->over or - return PublicInbox::WWW::need($ctx, 'Overview'); + my $opt = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC - my $qopts = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC - $qopts->{threads} = 1 if $q->{t}; - $srch->query_approxidate($ctx->{ibx}->git, $q_string); - my $mset = $srch->mset($q_string, $qopts); - $qopts->{offset} = $mset->size or - return [404, [qw(Content-Type text/plain)], - ["No results found\n"]]; - $ctx->{query} = $q_string; - $ctx->{ids} = $srch->mset_to_artnums($mset, $qopts); - require PublicInbox::MboxGz; - my $fn; - if ($q->{t} && $srch->has_threadid) { - $fn = 'results-thread-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_thread_cb, $fn); - } else { - $fn = 'results-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + # {threadid} limits results to a given thread + # {threads} collapses results from messages in the same thread, + # allowing us to use ->expand_thread w/o duplicates in our own code + if (defined($ctx->{mid})) { + my $over = ($ctx->{ibx}->{isrch} ? + $ctx->{ibx}->{isrch}->{es}->over : + $ctx->{ibx}->over) or + return PublicInbox::WWW::need($ctx, 'Overview'); + $opt->{threadid} = $over->mid2tid($ctx->{mid}); } + $opt->{threads} = 1 if $q->{t}; + $srch->query_approxidate($ctx->{ibx}->git, $qstr); + $ctx->{query} = $qstr; + sub { # called by PSGI server + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qstr, $opt, \&mbox_qry_cb, $ctx, $q) and + $ctx->{-really_async} = 1; + }; } 1; |