From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-3.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, RP_MATCHES_RCVD shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: spew@80x24.org Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 613CF20416 for ; Thu, 2 Jul 2015 01:36:32 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] socket: support accept `sock_nonblock: (true|false)' Date: Thu, 2 Jul 2015 01:36:32 +0000 Message-Id: <1435800992-18192-1-git-send-email-e@80x24.org> List-Id: [Feature ##11139] An application wanting to do non-blocking accept may want to create a blocking accepted socket, allow it with a kwarg while preserving default behavior. This is analogous to the SOCK_NONBLOCK flag in the Linux `accept4' syscall. While this has little obvious effect for Ruby API users (which can emulate blocking behavior) this will reduce syscalls made internally by Ruby. Forcing blocking will preserve "wake-one" behavior in the OS kernel to avoid a "thundering herd" problem. In all cases, existing Ruby 2.2 behavior is preserved by default to maximize compatibility, especially when sharing sockets with non-Ruby processes: `accept' and `sysaccept' calls will create sockets which are blocking by default. `accept_nonblock', calls will create sockets which are non-blocking by default. --- ext/socket/ancdata.c | 4 +-- ext/socket/init.c | 41 ++++++++++++++++------ ext/socket/rubysocket.h | 6 ++-- ext/socket/socket.c | 16 ++++----- ext/socket/tcpserver.c | 13 +++---- ext/socket/unixserver.c | 13 +++---- test/socket/test_nonblock.rb | 83 ++++++++++++++++++++++++++++++++++++++++++-- test/socket/test_tcp.rb | 53 ++++++++++++++++++++++++++++ test/socket/test_unix.rb | 67 +++++++++++++++++++++++++++++++++++ 9 files changed, 258 insertions(+), 38 deletions(-) diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index d0290ba..fddcfe7 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -1285,7 +1285,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) goto retry; } if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { - if (rsock_opt_false_p(opts, sym_exception)) { + if (rsock_opt_eq(opts, sym_exception, Qfalse)) { return sym_wait_writable; } rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, @@ -1602,7 +1602,7 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) goto retry; } if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { - if (rsock_opt_false_p(vopts, sym_exception)) { + if (rsock_opt_eq(vopts, sym_exception, Qfalse)) { return sym_wait_readable; } rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block"); diff --git a/ext/socket/init.c b/ext/socket/init.c index 34f6a11..de3b63d 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -29,7 +29,7 @@ VALUE rb_cSOCKSSocket; #endif int rsock_do_not_reverse_lookup = 1; -static VALUE sym_exception, sym_wait_readable; +static VALUE sym_exception, sym_wait_readable, sym_sock_nonblock; void rsock_raise_socket_error(const char *reason, int error) @@ -247,7 +247,7 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif - if (rsock_opt_false_p(opts, sym_exception)) + if (rsock_opt_eq(opts, sym_exception, Qfalse)) return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block"); } @@ -498,7 +498,7 @@ make_fd_nonblock(int fd) static int cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len, - int nonblock) + int sock_nonblock) { int ret; socklen_t len0 = 0; @@ -513,7 +513,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len, flags |= SOCK_CLOEXEC; #endif #ifdef SOCK_NONBLOCK - if (nonblock) { + if (sock_nonblock) { flags |= SOCK_NONBLOCK; } #endif @@ -523,7 +523,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len, if (ret <= 2) rb_maygvl_fd_fix_cloexec(ret); #ifndef SOCK_NONBLOCK - if (nonblock) { + if (sock_nonblock) { make_fd_nonblock(ret); } #endif @@ -540,7 +540,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len, if (ret == -1) return -1; if (address_len && len0 < *address_len) *address_len = len0; rb_maygvl_fd_fix_cloexec(ret); - if (nonblock) { + if (sock_nonblock) { make_fd_nonblock(ret); } return ret; @@ -551,12 +551,17 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, struct sockaddr *sockaddr, socklen_t *len) { int fd2; - VALUE opts = Qnil; + VALUE opts; + int sock_nonblock = 1; rb_scan_args(argc, argv, "0:", &opts); + if (rsock_opt_eq(opts, sym_sock_nonblock, Qfalse)) + sock_nonblock = 0; + rb_io_set_nonblock(fptr); - fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1); + fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, + sock_nonblock); if (fd2 < 0) { switch (errno) { case EAGAIN: @@ -567,7 +572,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, #if defined EPROTO case EPROTO: #endif - if (rsock_opt_false_p(opts, sym_exception)) + if (rsock_opt_eq(opts, sym_exception, Qfalse)) return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "accept(2) would block"); } @@ -579,6 +584,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, struct accept_arg { int fd; + int sock_nonblock; struct sockaddr *sockaddr; socklen_t *len; }; @@ -587,19 +593,31 @@ static VALUE accept_blocking(void *data) { struct accept_arg *arg = data; - return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len, 0); + return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len, + arg->sock_nonblock); } VALUE -rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len) +rsock_s_accept(int argc, VALUE *argv, VALUE klass, int fd, + struct sockaddr *sockaddr, socklen_t *len) { int fd2; int retry = 0; struct accept_arg arg; + VALUE opts; arg.fd = fd; + arg.sock_nonblock = 0; arg.sockaddr = sockaddr; arg.len = len; + + rb_scan_args(argc, argv, "0:", &opts); + + if (!NIL_P(opts) && + rb_hash_lookup2(opts, sym_sock_nonblock, Qundef) == Qtrue) { + arg.sock_nonblock = 1; + } + retry: rsock_maybe_wait_fd(fd); fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg); @@ -659,4 +677,5 @@ rsock_init_socket_init(void) #undef rb_intern sym_exception = ID2SYM(rb_intern("exception")); sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + sym_sock_nonblock = ID2SYM(rb_intern("sock_nonblock")); } diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index d03b1c5..11d5a78 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -342,7 +342,7 @@ VALUE rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type fr int rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks); -VALUE rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len); +VALUE rsock_s_accept(int argc, VALUE *argv, VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len); VALUE rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, struct sockaddr *sockaddr, socklen_t *len); VALUE rsock_sock_listen(VALUE sock, VALUE log); @@ -424,9 +424,9 @@ static inline void rsock_maybe_wait_fd(int fd) { } #endif static inline int -rsock_opt_false_p(VALUE opt, VALUE sym) +rsock_opt_eq(VALUE opt, VALUE sym, VALUE exp) { - if (!NIL_P(opt) && Qfalse == rb_hash_lookup2(opt, sym, Qundef)) + if (!NIL_P(opt) && exp == rb_hash_lookup2(opt, sym, Qundef)) return 1; return 0; } diff --git a/ext/socket/socket.c b/ext/socket/socket.c index 2cda4cb..73ebac4 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -502,13 +502,13 @@ sock_connect_nonblock(int argc, VALUE *argv, VALUE sock) n = connect(fptr->fd, (struct sockaddr*)RSTRING_PTR(addr), RSTRING_SOCKLEN(addr)); if (n < 0) { if (errno == EINPROGRESS) { - if (rsock_opt_false_p(opts, sym_exception)) { + if (rsock_opt_eq(opts, sym_exception, Qfalse)) { return sym_wait_writable; } rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "connect(2) would block"); } if (errno == EISCONN) { - if (rsock_opt_false_p(opts, sym_exception)) { + if (rsock_opt_eq(opts, sym_exception, Qfalse)) { return INT2FIX(0); } } @@ -896,7 +896,7 @@ sock_recvfrom_nonblock(int argc, VALUE *argv, VALUE sock) * */ static VALUE -sock_accept(VALUE sock) +sock_accept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; VALUE sock2; @@ -904,7 +904,7 @@ sock_accept(VALUE sock) socklen_t len = (socklen_t)sizeof buf; GetOpenFile(sock, fptr); - sock2 = rsock_s_accept(rb_cSocket,fptr->fd,&buf.addr,&len); + sock2 = rsock_s_accept(argc, argv, rb_cSocket, fptr->fd, &buf.addr, &len); return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len)); } @@ -1020,7 +1020,7 @@ sock_accept_nonblock(int argc, VALUE *argv, VALUE sock) * * Socket#accept */ static VALUE -sock_sysaccept(VALUE sock) +sock_sysaccept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; VALUE sock2; @@ -1028,7 +1028,7 @@ sock_sysaccept(VALUE sock) socklen_t len = (socklen_t)sizeof buf; GetOpenFile(sock, fptr); - sock2 = rsock_s_accept(0,fptr->fd,&buf.addr,&len); + sock2 = rsock_s_accept(argc, argv, 0, fptr->fd, &buf.addr, &len); return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len)); } @@ -2175,9 +2175,9 @@ Init_socket(void) rb_define_method(rb_cSocket, "connect_nonblock", sock_connect_nonblock, -1); rb_define_method(rb_cSocket, "bind", sock_bind, 1); rb_define_method(rb_cSocket, "listen", rsock_sock_listen, 1); - rb_define_method(rb_cSocket, "accept", sock_accept, 0); + rb_define_method(rb_cSocket, "accept", sock_accept, -1); rb_define_method(rb_cSocket, "accept_nonblock", sock_accept_nonblock, -1); - rb_define_method(rb_cSocket, "sysaccept", sock_sysaccept, 0); + rb_define_method(rb_cSocket, "sysaccept", sock_sysaccept, -1); rb_define_method(rb_cSocket, "recvfrom", sock_recvfrom, -1); rb_define_method(rb_cSocket, "recvfrom_nonblock", sock_recvfrom_nonblock, -1); diff --git a/ext/socket/tcpserver.c b/ext/socket/tcpserver.c index 578377b..a46eac5 100644 --- a/ext/socket/tcpserver.c +++ b/ext/socket/tcpserver.c @@ -53,7 +53,7 @@ tcp_svr_init(int argc, VALUE *argv, VALUE sock) * */ static VALUE -tcp_accept(VALUE sock) +tcp_accept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; union_sockaddr from; @@ -61,7 +61,8 @@ tcp_accept(VALUE sock) GetOpenFile(sock, fptr); fromlen = (socklen_t)sizeof(from); - return rsock_s_accept(rb_cTCPSocket, fptr->fd, &from.addr, &fromlen); + return rsock_s_accept(argc, argv, rb_cTCPSocket, + fptr->fd, &from.addr, &fromlen); } /* @@ -128,7 +129,7 @@ tcp_accept_nonblock(int argc, VALUE *argv, VALUE sock) * */ static VALUE -tcp_sysaccept(VALUE sock) +tcp_sysaccept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; union_sockaddr from; @@ -136,7 +137,7 @@ tcp_sysaccept(VALUE sock) GetOpenFile(sock, fptr); fromlen = (socklen_t)sizeof(from); - return rsock_s_accept(0, fptr->fd, &from.addr, &fromlen); + return rsock_s_accept(argc, argv, 0, fptr->fd, &from.addr, &fromlen); } void @@ -174,9 +175,9 @@ rsock_init_tcpserver(void) * */ rb_cTCPServer = rb_define_class("TCPServer", rb_cTCPSocket); - rb_define_method(rb_cTCPServer, "accept", tcp_accept, 0); + rb_define_method(rb_cTCPServer, "accept", tcp_accept, -1); rb_define_method(rb_cTCPServer, "accept_nonblock", tcp_accept_nonblock, -1); - rb_define_method(rb_cTCPServer, "sysaccept", tcp_sysaccept, 0); + rb_define_method(rb_cTCPServer, "sysaccept", tcp_sysaccept, -1); rb_define_method(rb_cTCPServer, "initialize", tcp_svr_init, -1); rb_define_method(rb_cTCPServer, "listen", rsock_sock_listen, 1); /* in socket.c */ } diff --git a/ext/socket/unixserver.c b/ext/socket/unixserver.c index a38d0cb..cc936fe 100644 --- a/ext/socket/unixserver.c +++ b/ext/socket/unixserver.c @@ -45,7 +45,7 @@ unix_svr_init(VALUE sock, VALUE path) * */ static VALUE -unix_accept(VALUE sock) +unix_accept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; struct sockaddr_un from; @@ -53,7 +53,7 @@ unix_accept(VALUE sock) GetOpenFile(sock, fptr); fromlen = (socklen_t)sizeof(struct sockaddr_un); - return rsock_s_accept(rb_cUNIXSocket, fptr->fd, + return rsock_s_accept(argc, argv, rb_cUNIXSocket, fptr->fd, (struct sockaddr*)&from, &fromlen); } @@ -126,7 +126,7 @@ unix_accept_nonblock(int argc, VALUE *argv, VALUE sock) * */ static VALUE -unix_sysaccept(VALUE sock) +unix_sysaccept(int argc, VALUE *argv, VALUE sock) { rb_io_t *fptr; struct sockaddr_un from; @@ -134,7 +134,8 @@ unix_sysaccept(VALUE sock) GetOpenFile(sock, fptr); fromlen = (socklen_t)sizeof(struct sockaddr_un); - return rsock_s_accept(0, fptr->fd, (struct sockaddr*)&from, &fromlen); + return rsock_s_accept(argc, argv, 0, fptr->fd, + (struct sockaddr*)&from, &fromlen); } #endif @@ -151,9 +152,9 @@ rsock_init_unixserver(void) */ rb_cUNIXServer = rb_define_class("UNIXServer", rb_cUNIXSocket); rb_define_method(rb_cUNIXServer, "initialize", unix_svr_init, 1); - rb_define_method(rb_cUNIXServer, "accept", unix_accept, 0); + rb_define_method(rb_cUNIXServer, "accept", unix_accept, -1); rb_define_method(rb_cUNIXServer, "accept_nonblock", unix_accept_nonblock, -1); - rb_define_method(rb_cUNIXServer, "sysaccept", unix_sysaccept, 0); + rb_define_method(rb_cUNIXServer, "sysaccept", unix_sysaccept, -1); rb_define_method(rb_cUNIXServer, "listen", rsock_sock_listen, 1); /* in socket.c */ #endif } diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb index 0853a15..0c47b39 100644 --- a/test/socket/test_nonblock.rb +++ b/test/socket/test_nonblock.rb @@ -26,8 +26,20 @@ class TestSocketNonblock < Test::Unit::TestCase s, sockaddr = serv.accept_nonblock end assert_equal(Socket.unpack_sockaddr_in(c.getsockname), Socket.unpack_sockaddr_in(sockaddr)) - if s.respond_to?(:nonblock?) - assert_predicate(s, :nonblock?, 'accepted socket is non-blocking') + + assert_predicate(s, :nonblock?, 'default behavior should be non-blocking') + + [ true, false ].each do |nb| + begin + b = Socket.new(:INET, :STREAM) + b.connect(serv.getsockname) + serv.wait_readable + a, _ = serv.accept_nonblock(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + a.close if a + b.close if b + end end ensure serv.close if serv @@ -392,4 +404,71 @@ class TestSocketNonblock < Test::Unit::TestCase s.close if s && !s.closed? end + def test_accept_sock_nonblock + serv = Socket.new(:INET, :STREAM) + serv.bind(Socket.sockaddr_in(0, "127.0.0.1")) + serv.listen(5) + begin + s, _ = serv.accept_nonblock + rescue Errno::EWOULDBLOCK + assert_kind_of(IO::WaitReadable, $!) + end + ensure + serv.close if serv && !serv.closed? + s.close if s && !s.closed? + end + + def test_accept_blocking_sock_nonblock + serv = Socket.new(:INET, :STREAM) + serv.bind(Socket.sockaddr_in(0, "127.0.0.1")) + serv.listen(5) + + begin + b = Socket.new(:INET, :STREAM) + b.connect(serv.getsockname) + a, _ = serv.accept + refute_predicate a, :nonblock? + ensure + a.close + b.close + end + + begin + b = Socket.new(:INET, :STREAM) + b.connect(serv.getsockname) + a, _ = serv.sysaccept + a = Socket.for_fd(a) + refute_predicate a, :nonblock? + ensure + a.close + b.close + end + + [ true, false ].each do |nb| + begin + b = Socket.new(:INET, :STREAM) + b.connect(serv.getsockname) + serv.wait_readable + a, _ = serv.accept(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + a.close if a + b.close if b + end + + begin + b = Socket.new(:INET, :STREAM) + b.connect(serv.getsockname) + serv.wait_readable + a, _ = serv.sysaccept(sock_nonblock: nb) + a = Socket.for_fd(a) + assert_equal nb, a.nonblock? + ensure + a.close if a + b.close if b + end + end + ensure + serv.close + end end if defined?(Socket) diff --git a/test/socket/test_tcp.rb b/test/socket/test_tcp.rb index 5e3528e..357f441 100644 --- a/test/socket/test_tcp.rb +++ b/test/socket/test_tcp.rb @@ -4,6 +4,8 @@ begin rescue LoadError end +require "io/wait" +require "io/nonblock" class TestSocket_TCPSocket < Test::Unit::TestCase def test_initialize_failure @@ -82,6 +84,57 @@ class TestSocket_TCPSocket < Test::Unit::TestCase assert_raise(IO::WaitReadable) { svr.accept_nonblock } assert_equal :wait_readable, svr.accept_nonblock(exception: false) assert_raise(IO::WaitReadable) { svr.accept_nonblock(exception: true) } + addr = svr.addr + host, port = addr[3], addr[1] + + begin + c = TCPSocket.new(host, port) + svr.wait_readable + a = svr.accept_nonblock + assert_predicate a, :nonblock?, 'default behavior' + ensure + c.close if c + a.close if a + end + + [ true, false ].each do |nb| + begin + c = TCPSocket.new(host, port) + svr.wait_readable + a = svr.accept_nonblock(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + c.close if c + a.close if a + end + end } end + + def test_accept_sock_nonblock + TCPServer.open("localhost", 0) do |svr| + addr = svr.addr + host, port = addr[3], addr[1] + + begin + c = TCPSocket.new(host, port) + a = svr.accept + refute_predicate a, :nonblock?, 'default behavior' + ensure + c.close if c + a.close if a + end + + [ true, false ].each do |nb| + begin + c = TCPSocket.new(host, port) + a = svr.accept(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + c.close if c + a.close if a + end + end + end + end end if defined?(TCPSocket) diff --git a/test/socket/test_unix.rb b/test/socket/test_unix.rb index 9ca97d2..2ae0f9a 100644 --- a/test/socket/test_unix.rb +++ b/test/socket/test_unix.rb @@ -9,6 +9,7 @@ require "timeout" require "tmpdir" require "thread" require "io/nonblock" +require "io/wait" class TestSocket_UNIXSocket < Test::Unit::TestCase def test_fd_passing @@ -675,6 +676,72 @@ class TestSocket_UNIXSocket < Test::Unit::TestCase assert_raise(IO::WaitReadable) { serv.accept_nonblock } assert_raise(IO::WaitReadable) { serv.accept_nonblock(exception: true) } assert_equal :wait_readable, serv.accept_nonblock(exception: false) + + begin + c = UNIXSocket.new(path) + serv.wait_readable + a = serv.accept_nonblock + assert_predicate a, :nonblock?, 'default behavior' + ensure + c.close if c + a.close if a + end + + [ true, false ].each do |nb| + begin + c = UNIXSocket.new(path) + serv.wait_readable + a = serv.accept_nonblock(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + c.close if c + a.close if a + end + end } end + + def test_accept_sock_nonblock + bound_unix_socket(UNIXServer) do |serv, path| + begin + c = UNIXSocket.new(path) + a = serv.accept + refute_predicate a, :nonblock?, 'default behavior' + ensure + c.close if c + a.close if a + end + + begin + c = UNIXSocket.new(path) + a = serv.sysaccept + a = UNIXSocket.for_fd(a) + refute_predicate a, :nonblock?, 'default behavior' + ensure + c.close if c + a.close if a + end + + [ true, false ].each do |nb| + begin + c = UNIXSocket.new(path) + a = serv.accept(sock_nonblock: nb) + assert_equal nb, a.nonblock? + ensure + c.close if c + a.close if a + end + + begin + c = UNIXSocket.new(path) + a = serv.sysaccept(sock_nonblock: nb) + a = UNIXSocket.for_fd(a) + assert_equal nb, a.nonblock? + ensure + c.close if c + a.close if a + end + end + end + end end if defined?(UNIXSocket) && /cygwin/ !~ RUBY_PLATFORM -- EW