From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS8100 96.44.188.0/22 X-Spam-Status: No, score=-2.2 required=3.0 tests=AWL,BAYES_00,RCVD_IN_XBL shortcircuit=no autolearn=no version=3.3.2 X-Original-To: spew@80x24.org Received: from 80x24.org (manning1.torservers.net [96.44.189.100]) by dcvr.yhbt.net (Postfix) with ESMTP id EFD461FAE6 for ; Fri, 5 Jun 2015 23:16:24 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg Date: Fri, 5 Jun 2015 23:16:20 +0000 Message-Id: <1433546180-5396-1-git-send-email-e@80x24.org> List-Id: As documented before, exceptions are expensive and IO::Wait*able are too common in socket applications to be the exceptional case. Datagram sockets deserve the same API which stream sockets are allowed with read_nonblock and write_nonblock. Note: this does not offer a performance advantage under optimal conditions when both ends are equally matched in speed, but it it does make debug output cleaner by avoiding exceptions whenever the receiver slows down. --- ext/socket/ancdata.c | 32 +++++++++++++++++++++++++++----- ext/socket/basicsocket.c | 8 ++++++-- ext/socket/init.c | 11 +++++------ ext/socket/rubysocket.h | 8 ++++++++ ext/socket/socket.c | 6 ++---- ext/socket/udpsocket.c | 8 ++++++-- test/socket/test_nonblock.rb | 44 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 98 insertions(+), 19 deletions(-) diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index 756cf7e..767a0bc 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -3,6 +3,7 @@ #include int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ +static VALUE sym_exception, sym_wait_readable, sym_wait_writable; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) static VALUE rb_cAncillaryData; @@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) VALUE data, vflags, dest_sockaddr; struct msghdr mh; struct iovec iov; + VALUE opts = Qnil; VALUE controls = Qnil; int controls_num; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1152,7 +1154,8 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) if (argc == 0) rb_raise(rb_eArgError, "mesg argument required"); - rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls); + rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls, + &opts); StringValue(data); controls_num = RARRAY_LENINT(controls); @@ -1285,8 +1288,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) - rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block"); + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (rsock_opt_false_p(opts, sym_exception)) { + return sym_wait_writable; + } + rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, + "sendmsg(2) would block"); + } rb_sys_fail("sendmsg(2)"); } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1340,7 +1348,7 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock) #if defined(HAVE_SENDMSG) /* * call-seq: - * basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls) => numbytes_sent + * basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls, opts={}) => numbytes_sent * * sendmsg_nonblock sends a message using sendmsg(2) system call in non-blocking manner. * @@ -1348,6 +1356,9 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock) * but the non-blocking flag is set before the system call * and it doesn't retry the system call. * + * By specifying `exception: false`, the _opts_ hash allows you to indicate + * that sendmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. */ VALUE rsock_bsock_sendmsg_nonblock(int argc, VALUE *argv, VALUE sock) @@ -1606,8 +1617,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (rsock_opt_false_p(vopts, sym_exception)) { + return sym_wait_readable; + } rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block"); + } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) { /* @@ -1792,6 +1807,9 @@ rsock_bsock_recvmsg(int argc, VALUE *argv, VALUE sock) * but non-blocking flag is set before the system call * and it doesn't retry the system call. * + * By specifying `exception: false`, the _opts_ hash allows you to indicate + * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. */ VALUE rsock_bsock_recvmsg_nonblock(int argc, VALUE *argv, VALUE sock) @@ -1837,4 +1855,8 @@ rsock_init_ancdata(void) rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0); rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0); #endif +#undef rb_intern + sym_exception = ID2SYM(rb_intern("exception")); + sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + sym_wait_writable = ID2SYM(rb_intern("wait_writable")); } diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index 5455977..4c79326 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) /* * call-seq: - * basicsocket.recv_nonblock(maxlen) => mesg - * basicsocket.recv_nonblock(maxlen, flags) => mesg + * basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg * * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after * O_NONBLOCK is set for the underlying file descriptor. @@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * === Parameters * * +maxlen+ - the number of bytes to receive from the socket * * +flags+ - zero or more of the +MSG_+ options + * * +options+ - keyword hash, supporting `exception: false` * * === Example * serv = TCPServer.new("127.0.0.1", 0) @@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * it is extended by IO::WaitReadable. * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock. * + * By specifying `exception: false`, the options hash allows you to indicate + * that recv_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. + * * === See * * Socket#recvfrom */ diff --git a/ext/socket/init.c b/ext/socket/init.c index 455652d..5f0d445 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -188,9 +188,10 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type long slen; int fd, flags; VALUE addr = Qnil; + VALUE opts = Qnil; socklen_t len0; - rb_scan_args(argc, argv, "11", &len, &flg); + rb_scan_args(argc, argv, "11:", &len, &flg, &opts); if (flg == Qnil) flags = 0; else flags = NUM2INT(flg); @@ -226,6 +227,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif + if (rsock_opt_false_p(opts, sym_exception)) + return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block"); } rb_sys_fail("recvfrom(2)"); @@ -528,14 +531,10 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, struct sockaddr *sockaddr, socklen_t *len) { int fd2; - int ex = 1; VALUE opts = Qnil; rb_scan_args(argc, argv, "0:", &opts); - if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) - ex = 0; - rb_secure(3); rb_io_set_nonblock(fptr); fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1); @@ -549,7 +548,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, #if defined EPROTO case EPROTO: #endif - if (!ex) + if (rsock_opt_false_p(opts, sym_exception)) return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "accept(2) would block"); } diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index 359d28e..d03b1c5 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -423,4 +423,12 @@ static inline void rsock_maybe_wait_fd(int fd) { } # define MSG_DONTWAIT_RELIABLE 0 #endif +static inline int +rsock_opt_false_p(VALUE opt, VALUE sym) +{ + if (!NIL_P(opt) && Qfalse == rb_hash_lookup2(opt, sym, Qundef)) + return 1; + return 0; +} + #endif diff --git a/ext/socket/socket.c b/ext/socket/socket.c index bb23703..35665e5 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -503,15 +503,13 @@ sock_connect_nonblock(int argc, VALUE *argv, VALUE sock) n = connect(fptr->fd, (struct sockaddr*)RSTRING_PTR(addr), RSTRING_SOCKLEN(addr)); if (n < 0) { if (errno == EINPROGRESS) { - if (!NIL_P(opts) && - Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) { + if (rsock_opt_false_p(opts, sym_exception)) { return sym_wait_writable; } rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "connect(2) would block"); } if (errno == EISCONN) { - if (!NIL_P(opts) && - Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) { + if (rsock_opt_false_p(opts, sym_exception)) { return INT2FIX(0); } } diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c index eb605ca..86d18f4 100644 --- a/ext/socket/udpsocket.c +++ b/ext/socket/udpsocket.c @@ -194,8 +194,7 @@ udp_send(int argc, VALUE *argv, VALUE sock) /* * call-seq: - * udpsocket.recvfrom_nonblock(maxlen) => [mesg, sender_inet_addr] - * udpsocket.recvfrom_nonblock(maxlen, flags) => [mesg, sender_inet_addr] + * udpsocket.recvfrom_nonblock(maxlen [, flags [, options]) => [mesg, sender_inet_addr] * * Receives up to _maxlen_ bytes from +udpsocket+ using recvfrom(2) after * O_NONBLOCK is set for the underlying file descriptor. @@ -211,6 +210,7 @@ udp_send(int argc, VALUE *argv, VALUE sock) * === Parameters * * +maxlen+ - the number of bytes to receive from the socket * * +flags+ - zero or more of the +MSG_+ options + * * +options+ - keyword hash, supporting `exception: false` * * === Example * require 'socket' @@ -238,6 +238,10 @@ udp_send(int argc, VALUE *argv, VALUE sock) * it is extended by IO::WaitReadable. * So IO::WaitReadable can be used to rescue the exceptions for retrying recvfrom_nonblock. * + * By specifying `exception: false`, the options hash allows you to indicate + * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. + * * === See * * Socket#recvfrom */ diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb index b1959ee..316ec5f 100644 --- a/test/socket/test_nonblock.rb +++ b/test/socket/test_nonblock.rb @@ -1,6 +1,7 @@ begin require "socket" require "io/nonblock" + require "io/wait" rescue LoadError end @@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recvfrom_nonblock_no_exception + udp_pair do |s1, s2| + assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false) + s2.send("aaa", 0, s1.getsockname) + assert s1.wait_readable + mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false) + assert_equal(4, inet_addr.length) + assert_equal("aaa", mesg) + end + end + if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET) def test_sendmsg_nonblock_seqpacket buf = '*' * 10000 @@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" end + + def test_sendmsg_nonblock_no_exception + buf = '*' * 128 + UNIXSocket.pair(:SEQPACKET) do |s1, s2| + n = 0 + Timeout.timeout(60) do + case rv = s1.sendmsg_nonblock(buf, exception: false) + when Integer + n += rv + when :wait_writable + break + else + flunk "unexpected return value: #{rv.inspect}" + end while true + assert_equal :wait_writable, rv + assert_operator n, :>, 0 + end + end + rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT + skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" + end end def test_recvmsg_nonblock_error @@ -297,6 +330,7 @@ class TestSocketNonblock < Test::Unit::TestCase rescue Errno::EWOULDBLOCK assert_kind_of(IO::WaitReadable, $!) end + assert_equal :wait_readable, s1.recvmsg_nonblock(11, exception: false) } end @@ -310,6 +344,16 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recv_nonblock_no_exception + tcp_pair {|c, s| + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + s.write('HI') + assert c.wait_readable + assert_equal 'HI', c.recv_nonblock(11, exception: false) + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + } + end + def test_connect_nonblock_error serv = TCPServer.new("127.0.0.1", 0) _, port, _, _ = serv.addr -- EW