* [PATCH 1/4] req_res: close if tied yahns_client is closed
@ 2016-06-05 21:23 Eric Wong
2016-06-05 21:23 ` [PATCH 2/4] req_res: store proxy_pass object here, instead Eric Wong
` (2 more replies)
0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2016-06-05 21:23 UTC (permalink / raw)
To: spew
---
lib/yahns/req_res.rb | 1 +
1 file changed, 1 insertion(+)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index dd4ec87..9e1a4d6 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -67,6 +67,7 @@ def yahns_step # yahns event loop entry point
return c.proxy_response_start(res, resbuf, req, self)
when Yahns::WbufCommon # streaming/buffering the response body
+ return if c.closed?
# we assign wbuf for rescue below:
return c.proxy_response_finish(req, wbuf = resbuf, self)
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/4] req_res: store proxy_pass object here, instead
2016-06-05 21:23 [PATCH 1/4] req_res: close if tied yahns_client is closed Eric Wong
@ 2016-06-05 21:23 ` Eric Wong
2016-06-05 21:23 ` [PATCH 3/4] proxy_pass: redo wbuf WIP Eric Wong
2016-06-05 21:23 ` [PATCH 4/4] try_gzip_static: ENAMETOOLONG Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2016-06-05 21:23 UTC (permalink / raw)
To: spew
We cannot rely on env being available after proxy_wait_next
---
lib/yahns/proxy_http_response.rb | 4 ++--
lib/yahns/proxy_pass.rb | 3 +--
lib/yahns/req_res.rb | 4 +++-
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 2968062..765fe14 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -27,7 +27,7 @@ def proxy_write(wbuf, buf, req_res)
when String, Array # partial write, hope the skb grows
buf = rv
when :wait_writable, :wait_readable
- if @hs.env['yahns.proxy_pass'].proxy_buffering
+ if req_res.proxy_pass.proxy_buffering
body = nil
alive = req_res.alive
else
@@ -88,7 +88,7 @@ def proxy_res_headers(res, req_res)
flags = MSG_DONTWAIT
alive = @hs.next? && self.class.persistent_connections
term = false
- response_headers = env['yahns.proxy_pass'].response_headers
+ response_headers = req_res.proxy_pass.response_headers
res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup
headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index ed37da5..fcd0cf7 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -89,10 +89,9 @@ def call(env)
ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
- env['yahns.proxy_pass'] = self
# finally, prepare to emit the headers
- rr.req_start(c, req << "\r\n".freeze, input, chunked)
+ rr.req_start(c, req << "\r\n".freeze, input, chunked, self)
# this probably breaks fewer middlewares than returning whatever else...
[ 500, [], [] ]
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index 9e1a4d6..1776f18 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -11,12 +11,14 @@ class Yahns::ReqRes < Kgio::Socket # :nodoc:
attr_writer :paused
attr_accessor :proxy_trailers
attr_accessor :alive
+ attr_reader :proxy_pass
- def req_start(c, req, input, chunked)
+ def req_start(c, req, input, chunked, proxy_pass)
@hdr = @resbuf = nil
@yahns_client = c
@paused = false
@rrstate = input ? [ req, input, chunked ] : req
+ @proxy_pass = proxy_pass
Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 3/4] proxy_pass: redo wbuf WIP
2016-06-05 21:23 [PATCH 1/4] req_res: close if tied yahns_client is closed Eric Wong
2016-06-05 21:23 ` [PATCH 2/4] req_res: store proxy_pass object here, instead Eric Wong
@ 2016-06-05 21:23 ` Eric Wong
2016-06-05 21:23 ` [PATCH 4/4] try_gzip_static: ENAMETOOLONG Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2016-06-05 21:23 UTC (permalink / raw)
To: spew
---
lib/yahns/proxy_http_response.rb | 80 +++++++++++++++++++-----------------
lib/yahns/req_res.rb | 26 +++---------
lib/yahns/wbuf.rb | 2 +-
lib/yahns/wbuf_proxy.rb | 48 ++++++++++++++++++++++
test/test_proxy_pass_no_buffering.rb | 19 ++++-----
5 files changed, 107 insertions(+), 68 deletions(-)
create mode 100644 lib/yahns/wbuf_proxy.rb
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 765fe14..0c3b6d8 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -3,20 +3,30 @@
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
+require_relative 'wbuf_proxy'
+
# loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
# constants.
module Yahns::HttpResponse # :nodoc:
# switch and yield
def proxy_unbuffer(wbuf)
- wbuf.body.resbuf = @state = wbuf
+ @state = wbuf
tc = Thread.current
tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
- tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ?
- Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD)
+ tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ?
+ Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR)
:ignore
end
+ def wbuf_alloc(req_res, busy)
+ if req_res.proxy_pass.proxy_buffering
+ Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, busy)
+ else
+ Yahns::WbufProxy.new(req_res)
+ end
+ end
+
# write everything in buf to our client socket (or wbuf, if it exists)
# it may return a newly-created wbuf or nil
def proxy_write(wbuf, buf, req_res)
@@ -27,15 +37,7 @@ def proxy_write(wbuf, buf, req_res)
when String, Array # partial write, hope the skb grows
buf = rv
when :wait_writable, :wait_readable
- if req_res.proxy_pass.proxy_buffering
- body = nil
- alive = req_res.alive
- else
- req_res.paused = true
- body = req_res
- alive = :ignore
- end
- wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv)
+ wbuf = req_res.resbuf ||= wbuf_alloc(req_res, rv)
break
end while true
end
@@ -44,7 +46,7 @@ def proxy_write(wbuf, buf, req_res)
wbuf.busy ? wbuf : nil
end
- def proxy_err_response(code, req_res, exc, wbuf)
+ def proxy_err_response(code, req_res, exc)
logger = @hs.env['rack.logger']
case exc
when nil
@@ -68,13 +70,12 @@ def proxy_err_response(code, req_res, exc, wbuf)
nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
ensure
- wbuf.wbuf_abort if wbuf
+ wbuf = req_res.resbuf
+ wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort)
end
- def wait_on_upstream(req_res, wbuf)
- req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive,
- self.class.output_buffer_tmpdir,
- false)
+ def wait_on_upstream(req_res)
+ req_res.resbuf ||= wbuf_alloc(req_res, false)
:wait_readable # self remains in :ignore, wait on upstream
end
@@ -135,18 +136,19 @@ def proxy_res_headers(res, req_res)
flags = MSG_DONTWAIT
res = rv # hope the skb grows
when :wait_writable, :wait_readable # highly unlikely in real apps
- wbuf = proxy_write(nil, res, req_res)
- break # keep buffering as much as possible
+ proxy_write(nil, res, req_res)
+ break # keep buffering body...
end while true
req_res.alive = alive
- [ wbuf, have_body ]
+ have_body
end
- def proxy_read_body(tip, kcar, req_res, wbuf)
+ def proxy_read_body(tip, kcar, req_res)
chunk = ''.dup if kcar.chunked?
len = kcar.body_bytes_left
rbuf = Thread.current[:yahns_rbuf]
alive = req_res.alive
+ wbuf = req_res.resbuf
case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
when String
@@ -161,45 +163,46 @@ def proxy_read_body(tip, kcar, req_res, wbuf)
# else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
end
wbuf = proxy_write(wbuf, tmp, req_res)
- return proxy_unbuffer(wbuf) if wbuf && wbuf.body
chunk.clear if chunk
+ return proxy_unbuffer(wbuf) if Yahns::WbufProxy === wbuf
when nil # EOF
# HTTP/1.1 upstream, unexpected premature EOF:
- return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+ return proxy_err_response(nil, req_res, nil) if len || chunk
# HTTP/1.0 upstream:
wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
- return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+ return proxy_unbuffer(wbuf) if Yahns::WbufProxy === wbuf
req_res.shutdown
break
when :wait_readable
- return wait_on_upstream(req_res, wbuf)
+ return wait_on_upstream(req_res)
end until kcar.body_eof?
if chunk
# tip is an empty array and becomes trailer storage
req_res.proxy_trailers = [ rbuf.dup, tip ]
- return proxy_read_trailers(kcar, req_res, wbuf)
+ return proxy_read_trailers(kcar, req_res)
end
proxy_busy_mod(wbuf, req_res)
end
- def proxy_read_trailers(kcar, req_res, wbuf)
+ def proxy_read_trailers(kcar, req_res)
chunk, tlr = req_res.proxy_trailers
rbuf = Thread.current[:yahns_rbuf]
+ wbuf = req_res.resbuf
until kcar.trailers(tlr, chunk)
case rv = req_res.kgio_tryread(0x2000, rbuf)
when String
chunk << rv
when :wait_readable
- return wait_on_upstream(req_res, wbuf)
+ return wait_on_upstream(req_res)
when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
+ return proxy_err_response(nil, req_res, nil)
end # no loop here
end
wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
- return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+ return proxy_unbuffer(wbuf) if Yahns::WbufProxy === wbuf
proxy_busy_mod(wbuf, req_res)
end
@@ -208,23 +211,26 @@ def proxy_read_trailers(kcar, req_res, wbuf)
# returns :ignore if we yield control to the client(self)
# returns nil if completely done
def proxy_response_start(res, tip, kcar, req_res)
- wbuf, have_body = proxy_res_headers(res, req_res)
+ have_body = proxy_res_headers(res, req_res)
tip = tip.empty? ? [] : [ tip ]
if have_body
req_res.proxy_trailers = nil # define to avoid uninitialized warnings
- return proxy_read_body(tip, kcar, req_res, wbuf)
+ return proxy_read_body(tip, kcar, req_res)
end
- return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+
+ # unlikely
+ wbuf = req_res.resbuf
+ return proxy_unbuffer(wbuf) if Yahns::WbufProxy === wbuf
# all done reading response from upstream, req_res will be discarded
# when we return nil:
proxy_busy_mod(wbuf, req_res)
end
- def proxy_response_finish(kcar, wbuf, req_res)
- req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf)
- : proxy_read_body([], kcar, req_res, wbuf)
+ def proxy_response_finish(kcar, req_res)
+ req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res)
+ : proxy_read_body([], kcar, req_res)
end
def proxy_wait_next(qflags)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index 1776f18..9bb8f35 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -7,8 +7,7 @@
require 'kgio'
class Yahns::ReqRes < Kgio::Socket # :nodoc:
- attr_writer :resbuf
- attr_writer :paused
+ attr_accessor :resbuf
attr_accessor :proxy_trailers
attr_accessor :alive
attr_reader :proxy_pass
@@ -16,23 +15,11 @@ class Yahns::ReqRes < Kgio::Socket # :nodoc:
def req_start(c, req, input, chunked, proxy_pass)
@hdr = @resbuf = nil
@yahns_client = c
- @paused = false
@rrstate = input ? [ req, input, chunked ] : req
@proxy_pass = proxy_pass
Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
- def close
- if @paused # called by wbuf_close_common as @body.close
- @paused = false
- # we must cleanup and set yahns_client state before queue_mod below:
- @yahns_client.hijack_cleanup
- Thread.current[:yahns_queue].queue_mod(self, Yahns::Queue::QEV_RD)
- else
- super
- end
- end
-
def yahns_step # yahns event loop entry point
c = @yahns_client
case req = @rrstate
@@ -55,7 +42,7 @@ def yahns_step # yahns event loop entry point
# continue looping in middle "case @resbuf" loop
when :wait_readable
return rv # spurious wakeup
- when nil then return c.proxy_err_response(502, self, nil, nil)
+ when nil then return c.proxy_err_response(502, self, nil)
end # NOT looping here
when String # continue reading trickled response headers from upstream
@@ -63,16 +50,15 @@ def yahns_step # yahns event loop entry point
case rv = kgio_tryread(0x2000, buf)
when String then res = req.headers(@hdr, resbuf << rv) and break
when :wait_readable then return rv
- when nil then return c.proxy_err_response(502, self, nil, nil)
+ when nil then return c.proxy_err_response(502, self, nil)
end while true
+ @resbuf = false
return c.proxy_response_start(res, resbuf, req, self)
when Yahns::WbufCommon # streaming/buffering the response body
- return if c.closed?
- # we assign wbuf for rescue below:
- return c.proxy_response_finish(req, wbuf = resbuf, self)
+ return c.proxy_response_finish(req, self)
end while true # case @resbuf
@@ -88,7 +74,7 @@ def yahns_step # yahns event loop entry point
when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
e.set_backtrace([])
end
- c.proxy_err_response(502, self, e, wbuf)
+ c.proxy_err_response(502, self, e)
end
def send_req_body_chunk(buf)
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index e6c794a..f7b2ffa 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -30,7 +30,7 @@
# to be a scalability issue.
class Yahns::Wbuf # :nodoc:
include Yahns::WbufCommon
- attr_reader :body, :busy, :wbuf_persist
+ attr_reader :busy
def initialize(body, persist, tmpdir, busy)
@tmpio = nil
diff --git a/lib/yahns/wbuf_proxy.rb b/lib/yahns/wbuf_proxy.rb
new file mode 100644
index 0000000..ca77c10
--- /dev/null
+++ b/lib/yahns/wbuf_proxy.rb
@@ -0,0 +1,48 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'wbuf_common'
+
+class Yahns::WbufProxy # :nodoc:
+ include Yahns::WbufCommon
+ attr_reader :busy
+
+ def initialize(req_res)
+ @req_res = req_res
+ @busy = false
+ @buf = ''.dup
+ end
+
+ def wbuf_write(client, buf)
+ buf = buf.join if Array === buf
+ @buf << buf
+ do_write(client)
+ end
+
+ def do_write(client)
+ buf = @buf
+ case rv = client.kgio_trywrite(buf)
+ when String
+ buf.clear
+ buf = rv # continue looping
+ when :wait_writable, :wait_readable
+ @buf = buf
+ return @busy = rv
+ when nil
+ @buf = buf.clear
+ return @busy = false
+ end while true
+ end
+
+ # called by Yahns::HttpClient#step_write
+ def wbuf_flush(client)
+ sym = do_write(client) and return sym # :wait_writable/:wait_readable
+ client.hijack_cleanup
+ Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD)
+ :ignore
+ rescue
+ @req_res.close
+ raise
+ end
+end
diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb
index 88b7c80..bf1c934 100644
--- a/test/test_proxy_pass_no_buffering.rb
+++ b/test/test_proxy_pass_no_buffering.rb
@@ -60,7 +60,7 @@ def test_proxy_pass_no_buffering
@srv2.close
cfg.instance_eval do
app(:rack, pxp) { listen "#{host}:#{port}" }
- stderr_path err.path
+ #stderr_path err.path
end
end
@@ -72,14 +72,13 @@ def test_proxy_pass_no_buffering
output_buffering false
listen "#{host2}:#{port2}"
end
- stderr_path err.path
+ #stderr_path err.path
end
end
s = TCPSocket.new(host, port)
req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
"Connection: close\r\n\r\n"
s.write(req)
- bufs = []
sleep 1
10.times do
sleep 0.1
@@ -92,14 +91,10 @@ def test_proxy_pass_no_buffering
[ deleted1, deleted2 ].each do |ary|
ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
end
- assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
+ assert_equal 0, deleted1.size, "pid1=#{deleted1.inspect}"
assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
- bufs.push(deleted1[0])
end
end
- before = bufs.size
- bufs.uniq!
- assert bufs.size < before, 'unlinked buffer should not grow'
buf = ''.dup
slow = Digest::MD5.new
ft = Thread.new do
@@ -108,26 +103,30 @@ def test_proxy_pass_no_buffering
f.write(req)
b2 = ''.dup
check_headers(f)
+ nf = 0
begin
f.readpartial(1024 * 1024, b2)
+ nf += b2.bytesize
fast.update(b2)
rescue EOFError
f = f.close
end while f
b2.clear
- fast
+ [ nf, fast.hexdigest ]
end
Thread.abort_on_exception = true
check_headers(s)
+ n = 0
begin
s.readpartial(1024 * 1024, buf)
slow.update(buf)
+ n += buf.bytesize
sleep 0.01
rescue EOFError
s = s.close
end while s
ft.join(5)
- assert_equal slow.hexdigest, ft.value.hexdigest
+ assert_equal [n, slow.hexdigest ], ft.value
fast = Digest::MD5.new
f = TCPSocket.new(host, port)
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 4/4] try_gzip_static: ENAMETOOLONG
2016-06-05 21:23 [PATCH 1/4] req_res: close if tied yahns_client is closed Eric Wong
2016-06-05 21:23 ` [PATCH 2/4] req_res: store proxy_pass object here, instead Eric Wong
2016-06-05 21:23 ` [PATCH 3/4] proxy_pass: redo wbuf WIP Eric Wong
@ 2016-06-05 21:23 ` Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2016-06-05 21:23 UTC (permalink / raw)
To: spew
---
| 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--git a/extras/try_gzip_static.rb b/extras/try_gzip_static.rb
index 0d5d63b..31c1aa1 100644
--- a/extras/try_gzip_static.rb
+++ b/extras/try_gzip_static.rb
@@ -203,7 +203,7 @@ def r(code, exc = nil, env = nil)
msg = msg.dump if /[[:cntrl:]]/ =~ msg # prevent code injection
logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \
"#{code} #{msg}")
- if exc.respond_to?(:backtrace)
+ if exc.respond_to?(:backtrace) && !(SystemCallError === exc)
exc.backtrace.each { |line| logger.warn(line) }
end
end
^ permalink raw reply related [flat|nested] 4+ messages in thread
end of thread, other threads:[~2016-06-05 21:23 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2016-06-05 21:23 [PATCH 1/4] req_res: close if tied yahns_client is closed Eric Wong
2016-06-05 21:23 ` [PATCH 2/4] req_res: store proxy_pass object here, instead Eric Wong
2016-06-05 21:23 ` [PATCH 3/4] proxy_pass: redo wbuf WIP Eric Wong
2016-06-05 21:23 ` [PATCH 4/4] try_gzip_static: ENAMETOOLONG Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).