From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS43350 77.247.176.0/21 X-Spam-Status: No, score=-1.5 required=3.0 tests=AWL,BAYES_00, RCVD_IN_DNSWL_BLOCKED,RCVD_IN_XBL shortcircuit=no autolearn=no version=3.3.2 X-Original-To: spew@80x24.org Received: from 80x24.org (politkovskaja.torservers.net [77.247.181.165]) by dcvr.yhbt.net (Postfix) with ESMTP id 75E4B1FA55 for ; Thu, 4 Jun 2015 22:07:42 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg Date: Thu, 4 Jun 2015 22:07:40 +0000 Message-Id: <1433455660-12060-1-git-send-email-e@80x24.org> List-Id: --- ext/socket/ancdata.c | 36 +++++++++++++++++++++++++++++------- ext/socket/basicsocket.c | 8 ++++++-- ext/socket/init.c | 9 ++++++++- test/socket/test_nonblock.rb | 43 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 10 deletions(-) diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index 277a1e8..9594b61 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -3,6 +3,7 @@ #include int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ +static VALUE sym_exception, sym_wait_readable, sym_wait_writable; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) static VALUE rb_cAncillaryData; @@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) VALUE data, vflags, dest_sockaddr; struct msghdr mh; struct iovec iov; + VALUE opts = Qnil; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) VALUE controls = Qnil; VALUE controls_str = 0; @@ -1140,6 +1142,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) #endif int flags; ssize_t ss; + int ex = 1; GetOpenFile(sock, fptr); #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1151,11 +1154,15 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) if (argc == 0) rb_raise(rb_eArgError, "mesg argument required"); #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) - rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls); + rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls, + &opts); #else - rb_scan_args(argc, argv, "12", &data, &vflags, &dest_sockaddr); + rb_scan_args(argc, argv, "12:", &data, &vflags, &dest_sockaddr, &opts); #endif + if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) + ex = 0; + StringValue(data); if (!NIL_P(controls)) { @@ -1287,8 +1294,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) - rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block"); + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (ex) { + rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, + "sendmsg(2) would block"); + } + return sym_wait_writable; + } rb_sys_fail("sendmsg(2)"); } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1498,6 +1510,7 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) VALUE ret; ssize_t ss; int request_scm_rights; + int ex = 1; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) struct cmsghdr *cmh; size_t maxctllen; @@ -1531,8 +1544,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) grow_buffer = NIL_P(vmaxdatlen) || NIL_P(vmaxctllen); request_scm_rights = 0; - if (!NIL_P(vopts) && RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights"))))) - request_scm_rights = 1; + if (!NIL_P(vopts)) { + if (RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights"))))) + request_scm_rights = 1; + if (nonblock && Qfalse == rb_hash_lookup2(vopts, sym_exception, Qundef)) + ex = 0; + } #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) if (request_scm_rights) rb_raise(rb_eNotImpError, "control message for recvmsg is unimplemented"); @@ -1608,8 +1625,10 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (ex) return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block"); + } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) { /* @@ -1839,4 +1858,7 @@ rsock_init_ancdata(void) rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0); rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0); #endif + sym_exception = ID2SYM(rb_intern("exception")); + sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + sym_wait_writable = ID2SYM(rb_intern("wait_writable")); } diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index 5455977..4c79326 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) /* * call-seq: - * basicsocket.recv_nonblock(maxlen) => mesg - * basicsocket.recv_nonblock(maxlen, flags) => mesg + * basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg * * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after * O_NONBLOCK is set for the underlying file descriptor. @@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * === Parameters * * +maxlen+ - the number of bytes to receive from the socket * * +flags+ - zero or more of the +MSG_+ options + * * +options+ - keyword hash, supporting `exception: false` * * === Example * serv = TCPServer.new("127.0.0.1", 0) @@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * it is extended by IO::WaitReadable. * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock. * + * By specifying `exception: false`, the options hash allows you to indicate + * that recv_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. + * * === See * * Socket#recvfrom */ diff --git a/ext/socket/init.c b/ext/socket/init.c index 455652d..eadeba9 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -188,14 +188,19 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type long slen; int fd, flags; VALUE addr = Qnil; + VALUE opts = Qnil; socklen_t len0; + int ex = 1; - rb_scan_args(argc, argv, "11", &len, &flg); + rb_scan_args(argc, argv, "11:", &len, &flg, &opts); if (flg == Qnil) flags = 0; else flags = NUM2INT(flg); buflen = NUM2INT(len); + if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) + ex = 0; + #ifdef MSG_DONTWAIT /* MSG_DONTWAIT avoids the race condition between fcntl and recvfrom. It is not portable, though. */ @@ -226,6 +231,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif + if (!ex) + return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block"); } rb_sys_fail("recvfrom(2)"); diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb index b1959ee..b30a52f 100644 --- a/test/socket/test_nonblock.rb +++ b/test/socket/test_nonblock.rb @@ -1,6 +1,7 @@ begin require "socket" require "io/nonblock" + require "io/wait" rescue LoadError end @@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recvfrom_nonblock_no_exception + udp_pair do |s1, s2| + assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false) + s2.send("aaa", 0, s1.getsockname) + assert s1.wait_readable + mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false) + assert_equal(4, inet_addr.length) + assert_equal("aaa", mesg) + end + end + if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET) def test_sendmsg_nonblock_seqpacket buf = '*' * 10000 @@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" end + + def test_sendmsg_nonblock_no_exception + buf = '*' * 128 + UNIXSocket.pair(:SEQPACKET) do |s1, s2| + n = 0 + Timeout.timeout(60) do + case rv = s1.sendmsg_nonblock(buf, exception: false) + when Integer + n += rv + when :wait_writable + break + else + flunk "unexpected return value: #{rv.inspect}" + end while true + assert_equal :wait_writable, rv + assert_operator n, :>, 0 + end + end + rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT + skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" + end end def test_recvmsg_nonblock_error @@ -310,6 +343,16 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recv_nonblock_no_exception + tcp_pair {|c, s| + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + s.write('HI') + assert c.wait_readable + assert_equal 'HI', c.recv_nonblock(11, exception: false) + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + } + end + def test_connect_nonblock_error serv = TCPServer.new("127.0.0.1", 0) _, port, _, _ = serv.addr -- EW