From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg
Date: Thu, 4 Jun 2015 22:08:46 +0000 [thread overview]
Message-ID: <1433455726-12175-1-git-send-email-e@80x24.org> (raw)
---
ext/socket/ancdata.c | 36 +++++++++++++++++++++++++++++-------
ext/socket/basicsocket.c | 8 ++++++--
ext/socket/init.c | 9 ++++++++-
test/socket/test_nonblock.rb | 43 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 86 insertions(+), 10 deletions(-)
diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 277a1e8..9594b61 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -3,6 +3,7 @@
#include <time.h>
int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
+static VALUE sym_exception, sym_wait_readable, sym_wait_writable;
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
static VALUE rb_cAncillaryData;
@@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
VALUE data, vflags, dest_sockaddr;
struct msghdr mh;
struct iovec iov;
+ VALUE opts = Qnil;
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
VALUE controls = Qnil;
VALUE controls_str = 0;
@@ -1140,6 +1142,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
#endif
int flags;
ssize_t ss;
+ int ex = 1;
GetOpenFile(sock, fptr);
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1151,11 +1154,15 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
if (argc == 0)
rb_raise(rb_eArgError, "mesg argument required");
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
- rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls);
+ rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls,
+ &opts);
#else
- rb_scan_args(argc, argv, "12", &data, &vflags, &dest_sockaddr);
+ rb_scan_args(argc, argv, "12:", &data, &vflags, &dest_sockaddr, &opts);
#endif
+ if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+ ex = 0;
+
StringValue(data);
if (!NIL_P(controls)) {
@@ -1287,8 +1294,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
rb_io_check_closed(fptr);
goto retry;
}
- if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
- rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block");
+ if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+ if (ex) {
+ rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE,
+ "sendmsg(2) would block");
+ }
+ return sym_wait_writable;
+ }
rb_sys_fail("sendmsg(2)");
}
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1498,6 +1510,7 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
VALUE ret;
ssize_t ss;
int request_scm_rights;
+ int ex = 1;
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
struct cmsghdr *cmh;
size_t maxctllen;
@@ -1531,8 +1544,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
grow_buffer = NIL_P(vmaxdatlen) || NIL_P(vmaxctllen);
request_scm_rights = 0;
- if (!NIL_P(vopts) && RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
- request_scm_rights = 1;
+ if (!NIL_P(vopts)) {
+ if (RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
+ request_scm_rights = 1;
+ if (nonblock && Qfalse == rb_hash_lookup2(vopts, sym_exception, Qundef))
+ ex = 0;
+ }
#if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
if (request_scm_rights)
rb_raise(rb_eNotImpError, "control message for recvmsg is unimplemented");
@@ -1608,8 +1625,10 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
rb_io_check_closed(fptr);
goto retry;
}
- if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
+ if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+ if (ex) return sym_wait_readable;
rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block");
+ }
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) {
/*
@@ -1839,4 +1858,7 @@ rsock_init_ancdata(void)
rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0);
rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0);
#endif
+ sym_exception = ID2SYM(rb_intern("exception"));
+ sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
+ sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
}
diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c
index 5455977..4c79326 100644
--- a/ext/socket/basicsocket.c
+++ b/ext/socket/basicsocket.c
@@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
/*
* call-seq:
- * basicsocket.recv_nonblock(maxlen) => mesg
- * basicsocket.recv_nonblock(maxlen, flags) => mesg
+ * basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg
*
* Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after
* O_NONBLOCK is set for the underlying file descriptor.
@@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
* === Parameters
* * +maxlen+ - the number of bytes to receive from the socket
* * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
*
* === Example
* serv = TCPServer.new("127.0.0.1", 0)
@@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
* it is extended by IO::WaitReadable.
* So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock.
*
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recv_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
* === See
* * Socket#recvfrom
*/
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 455652d..eadeba9 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -188,14 +188,19 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
long slen;
int fd, flags;
VALUE addr = Qnil;
+ VALUE opts = Qnil;
socklen_t len0;
+ int ex = 1;
- rb_scan_args(argc, argv, "11", &len, &flg);
+ rb_scan_args(argc, argv, "11:", &len, &flg, &opts);
if (flg == Qnil) flags = 0;
else flags = NUM2INT(flg);
buflen = NUM2INT(len);
+ if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+ ex = 0;
+
#ifdef MSG_DONTWAIT
/* MSG_DONTWAIT avoids the race condition between fcntl and recvfrom.
It is not portable, though. */
@@ -226,6 +231,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
+ if (!ex)
+ return sym_wait_readable;
rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block");
}
rb_sys_fail("recvfrom(2)");
diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb
index b1959ee..b30a52f 100644
--- a/test/socket/test_nonblock.rb
+++ b/test/socket/test_nonblock.rb
@@ -1,6 +1,7 @@
begin
require "socket"
require "io/nonblock"
+ require "io/wait"
rescue LoadError
end
@@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase
}
end
+ def test_recvfrom_nonblock_no_exception
+ udp_pair do |s1, s2|
+ assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false)
+ s2.send("aaa", 0, s1.getsockname)
+ assert s1.wait_readable
+ mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false)
+ assert_equal(4, inet_addr.length)
+ assert_equal("aaa", mesg)
+ end
+ end
+
if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET)
def test_sendmsg_nonblock_seqpacket
buf = '*' * 10000
@@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase
rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
end
+
+ def test_sendmsg_nonblock_no_exception
+ buf = '*' * 128
+ UNIXSocket.pair(:SEQPACKET) do |s1, s2|
+ n = 0
+ Timeout.timeout(60) do
+ case rv = s1.sendmsg_nonblock(buf, exception: false)
+ when Integer
+ n += rv
+ when :wait_writable
+ break
+ else
+ flunk "unexpected return value: #{rv.inspect}"
+ end while true
+ assert_equal :wait_writable, rv
+ assert_operator n, :>, 0
+ end
+ end
+ rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
+ skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
+ end
end
def test_recvmsg_nonblock_error
@@ -310,6 +343,16 @@ class TestSocketNonblock < Test::Unit::TestCase
}
end
+ def test_recv_nonblock_no_exception
+ tcp_pair {|c, s|
+ assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+ s.write('HI')
+ assert c.wait_readable
+ assert_equal 'HI', c.recv_nonblock(11, exception: false)
+ assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+ }
+ end
+
def test_connect_nonblock_error
serv = TCPServer.new("127.0.0.1", 0)
_, port, _, _ = serv.addr
--
EW
next reply other threads:[~2015-06-04 22:08 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-06-04 22:08 Eric Wong [this message]
-- strict thread matches above, loose matches on Subject: below --
2015-06-05 23:16 [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg Eric Wong
2015-06-05 0:01 Eric Wong
2015-06-04 22:07 Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1433455726-12175-1-git-send-email-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).