dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg
@ 2015-06-04 22:07 Eric Wong
  0 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-06-04 22:07 UTC (permalink / raw)
  To: spew

---
 ext/socket/ancdata.c         | 36 +++++++++++++++++++++++++++++-------
 ext/socket/basicsocket.c     |  8 ++++++--
 ext/socket/init.c            |  9 ++++++++-
 test/socket/test_nonblock.rb | 43 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 86 insertions(+), 10 deletions(-)

diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 277a1e8..9594b61 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -3,6 +3,7 @@
 #include <time.h>
 
 int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
+static VALUE sym_exception, sym_wait_readable, sym_wait_writable;
 
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
 static VALUE rb_cAncillaryData;
@@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE data, vflags, dest_sockaddr;
     struct msghdr mh;
     struct iovec iov;
+    VALUE opts = Qnil;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     VALUE controls = Qnil;
     VALUE controls_str = 0;
@@ -1140,6 +1142,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
 #endif
     int flags;
     ssize_t ss;
+    int ex = 1;
 
     GetOpenFile(sock, fptr);
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1151,11 +1154,15 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     if (argc == 0)
         rb_raise(rb_eArgError, "mesg argument required");
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
-    rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls);
+    rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls,
+                 &opts);
 #else
-    rb_scan_args(argc, argv, "12", &data, &vflags, &dest_sockaddr);
+    rb_scan_args(argc, argv, "12:", &data, &vflags, &dest_sockaddr, &opts);
 #endif
 
+    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+	ex = 0;
+
     StringValue(data);
 
     if (!NIL_P(controls)) {
@@ -1287,8 +1294,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-            rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block");
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+	    if (ex) {
+		rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE,
+			              "sendmsg(2) would block");
+	    }
+	    return sym_wait_writable;
+	}
 	rb_sys_fail("sendmsg(2)");
     }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1498,6 +1510,7 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE ret;
     ssize_t ss;
     int request_scm_rights;
+    int ex = 1;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     struct cmsghdr *cmh;
     size_t maxctllen;
@@ -1531,8 +1544,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     grow_buffer = NIL_P(vmaxdatlen) || NIL_P(vmaxctllen);
 
     request_scm_rights = 0;
-    if (!NIL_P(vopts) && RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
-        request_scm_rights = 1;
+    if (!NIL_P(vopts)) {
+	if (RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
+	    request_scm_rights = 1;
+	if (nonblock && Qfalse == rb_hash_lookup2(vopts, sym_exception, Qundef))
+	    ex = 0;
+    }
 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     if (request_scm_rights)
         rb_raise(rb_eNotImpError, "control message for recvmsg is unimplemented");
@@ -1608,8 +1625,10 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+            if (ex) return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block");
+	}
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
         if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) {
           /*
@@ -1839,4 +1858,7 @@ rsock_init_ancdata(void)
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0);
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0);
 #endif
+    sym_exception = ID2SYM(rb_intern("exception"));
+    sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
+    sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
 }
diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c
index 5455977..4c79326 100644
--- a/ext/socket/basicsocket.c
+++ b/ext/socket/basicsocket.c
@@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- * 	basicsocket.recv_nonblock(maxlen) => mesg
- * 	basicsocket.recv_nonblock(maxlen, flags) => mesg
+ * 	basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg
  *
  * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	serv = TCPServer.new("127.0.0.1", 0)
@@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recv_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 455652d..eadeba9 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -188,14 +188,19 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
     long slen;
     int fd, flags;
     VALUE addr = Qnil;
+    VALUE opts = Qnil;
     socklen_t len0;
+    int ex = 1;
 
-    rb_scan_args(argc, argv, "11", &len, &flg);
+    rb_scan_args(argc, argv, "11:", &len, &flg, &opts);
 
     if (flg == Qnil) flags = 0;
     else             flags = NUM2INT(flg);
     buflen = NUM2INT(len);
 
+    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+	ex = 0;
+
 #ifdef MSG_DONTWAIT
     /* MSG_DONTWAIT avoids the race condition between fcntl and recvfrom.
        It is not portable, though. */
@@ -226,6 +231,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
+            if (!ex)
+		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block");
 	}
 	rb_sys_fail("recvfrom(2)");
diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb
index b1959ee..b30a52f 100644
--- a/test/socket/test_nonblock.rb
+++ b/test/socket/test_nonblock.rb
@@ -1,6 +1,7 @@
 begin
   require "socket"
   require "io/nonblock"
+  require "io/wait"
 rescue LoadError
 end
 
@@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recvfrom_nonblock_no_exception
+    udp_pair do |s1, s2|
+      assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false)
+      s2.send("aaa", 0, s1.getsockname)
+      assert s1.wait_readable
+      mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false)
+      assert_equal(4, inet_addr.length)
+      assert_equal("aaa", mesg)
+    end
+  end
+
   if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET)
     def test_sendmsg_nonblock_seqpacket
       buf = '*' * 10000
@@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase
     rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
       skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
     end
+
+    def test_sendmsg_nonblock_no_exception
+      buf = '*' * 128
+      UNIXSocket.pair(:SEQPACKET) do |s1, s2|
+        n = 0
+        Timeout.timeout(60) do
+          case rv = s1.sendmsg_nonblock(buf, exception: false)
+          when Integer
+            n += rv
+          when :wait_writable
+            break
+          else
+            flunk "unexpected return value: #{rv.inspect}"
+          end while true
+          assert_equal :wait_writable, rv
+          assert_operator n, :>, 0
+        end
+      end
+    rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
+      skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
+    end
   end
 
   def test_recvmsg_nonblock_error
@@ -310,6 +343,16 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recv_nonblock_no_exception
+    tcp_pair {|c, s|
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+      s.write('HI')
+      assert c.wait_readable
+      assert_equal 'HI', c.recv_nonblock(11, exception: false)
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+    }
+  end
+
   def test_connect_nonblock_error
     serv = TCPServer.new("127.0.0.1", 0)
     _, port, _, _ = serv.addr
-- 
EW


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg
@ 2015-06-04 22:08 Eric Wong
  0 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-06-04 22:08 UTC (permalink / raw)
  To: spew

---
 ext/socket/ancdata.c         | 36 +++++++++++++++++++++++++++++-------
 ext/socket/basicsocket.c     |  8 ++++++--
 ext/socket/init.c            |  9 ++++++++-
 test/socket/test_nonblock.rb | 43 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 86 insertions(+), 10 deletions(-)

diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 277a1e8..9594b61 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -3,6 +3,7 @@
 #include <time.h>
 
 int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
+static VALUE sym_exception, sym_wait_readable, sym_wait_writable;
 
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
 static VALUE rb_cAncillaryData;
@@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE data, vflags, dest_sockaddr;
     struct msghdr mh;
     struct iovec iov;
+    VALUE opts = Qnil;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     VALUE controls = Qnil;
     VALUE controls_str = 0;
@@ -1140,6 +1142,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
 #endif
     int flags;
     ssize_t ss;
+    int ex = 1;
 
     GetOpenFile(sock, fptr);
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1151,11 +1154,15 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     if (argc == 0)
         rb_raise(rb_eArgError, "mesg argument required");
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
-    rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls);
+    rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls,
+                 &opts);
 #else
-    rb_scan_args(argc, argv, "12", &data, &vflags, &dest_sockaddr);
+    rb_scan_args(argc, argv, "12:", &data, &vflags, &dest_sockaddr, &opts);
 #endif
 
+    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+	ex = 0;
+
     StringValue(data);
 
     if (!NIL_P(controls)) {
@@ -1287,8 +1294,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-            rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block");
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+	    if (ex) {
+		rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE,
+			              "sendmsg(2) would block");
+	    }
+	    return sym_wait_writable;
+	}
 	rb_sys_fail("sendmsg(2)");
     }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1498,6 +1510,7 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE ret;
     ssize_t ss;
     int request_scm_rights;
+    int ex = 1;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     struct cmsghdr *cmh;
     size_t maxctllen;
@@ -1531,8 +1544,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     grow_buffer = NIL_P(vmaxdatlen) || NIL_P(vmaxctllen);
 
     request_scm_rights = 0;
-    if (!NIL_P(vopts) && RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
-        request_scm_rights = 1;
+    if (!NIL_P(vopts)) {
+	if (RTEST(rb_hash_aref(vopts, ID2SYM(rb_intern("scm_rights")))))
+	    request_scm_rights = 1;
+	if (nonblock && Qfalse == rb_hash_lookup2(vopts, sym_exception, Qundef))
+	    ex = 0;
+    }
 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     if (request_scm_rights)
         rb_raise(rb_eNotImpError, "control message for recvmsg is unimplemented");
@@ -1608,8 +1625,10 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+            if (ex) return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block");
+	}
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
         if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) {
           /*
@@ -1839,4 +1858,7 @@ rsock_init_ancdata(void)
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0);
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0);
 #endif
+    sym_exception = ID2SYM(rb_intern("exception"));
+    sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
+    sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
 }
diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c
index 5455977..4c79326 100644
--- a/ext/socket/basicsocket.c
+++ b/ext/socket/basicsocket.c
@@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- * 	basicsocket.recv_nonblock(maxlen) => mesg
- * 	basicsocket.recv_nonblock(maxlen, flags) => mesg
+ * 	basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg
  *
  * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	serv = TCPServer.new("127.0.0.1", 0)
@@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recv_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 455652d..eadeba9 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -188,14 +188,19 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
     long slen;
     int fd, flags;
     VALUE addr = Qnil;
+    VALUE opts = Qnil;
     socklen_t len0;
+    int ex = 1;
 
-    rb_scan_args(argc, argv, "11", &len, &flg);
+    rb_scan_args(argc, argv, "11:", &len, &flg, &opts);
 
     if (flg == Qnil) flags = 0;
     else             flags = NUM2INT(flg);
     buflen = NUM2INT(len);
 
+    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
+	ex = 0;
+
 #ifdef MSG_DONTWAIT
     /* MSG_DONTWAIT avoids the race condition between fcntl and recvfrom.
        It is not portable, though. */
@@ -226,6 +231,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
+            if (!ex)
+		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block");
 	}
 	rb_sys_fail("recvfrom(2)");
diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb
index b1959ee..b30a52f 100644
--- a/test/socket/test_nonblock.rb
+++ b/test/socket/test_nonblock.rb
@@ -1,6 +1,7 @@
 begin
   require "socket"
   require "io/nonblock"
+  require "io/wait"
 rescue LoadError
 end
 
@@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recvfrom_nonblock_no_exception
+    udp_pair do |s1, s2|
+      assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false)
+      s2.send("aaa", 0, s1.getsockname)
+      assert s1.wait_readable
+      mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false)
+      assert_equal(4, inet_addr.length)
+      assert_equal("aaa", mesg)
+    end
+  end
+
   if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET)
     def test_sendmsg_nonblock_seqpacket
       buf = '*' * 10000
@@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase
     rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
       skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
     end
+
+    def test_sendmsg_nonblock_no_exception
+      buf = '*' * 128
+      UNIXSocket.pair(:SEQPACKET) do |s1, s2|
+        n = 0
+        Timeout.timeout(60) do
+          case rv = s1.sendmsg_nonblock(buf, exception: false)
+          when Integer
+            n += rv
+          when :wait_writable
+            break
+          else
+            flunk "unexpected return value: #{rv.inspect}"
+          end while true
+          assert_equal :wait_writable, rv
+          assert_operator n, :>, 0
+        end
+      end
+    rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
+      skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
+    end
   end
 
   def test_recvmsg_nonblock_error
@@ -310,6 +343,16 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recv_nonblock_no_exception
+    tcp_pair {|c, s|
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+      s.write('HI')
+      assert c.wait_readable
+      assert_equal 'HI', c.recv_nonblock(11, exception: false)
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+    }
+  end
+
   def test_connect_nonblock_error
     serv = TCPServer.new("127.0.0.1", 0)
     _, port, _, _ = serv.addr
-- 
EW


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg
@ 2015-06-05  0:01 Eric Wong
  0 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-06-05  0:01 UTC (permalink / raw)
  To: spew

As documented before, exceptions are expensive and IO::Wait*able are too
common in socket applications to be the exceptional case.  Datagram
sockets deserve the same API which stream sockets are allowed with
read_nonblock and write_nonblock.
---
 ext/socket/ancdata.c         | 33 +++++++++++++++++++++++++++------
 ext/socket/basicsocket.c     |  8 ++++++--
 ext/socket/init.c            | 11 +++++------
 ext/socket/rubysocket.h      |  8 ++++++++
 ext/socket/socket.c          |  6 ++----
 ext/socket/udpsocket.c       |  8 ++++++--
 test/socket/test_nonblock.rb | 44 ++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 277a1e8..369ac95 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -3,6 +3,7 @@
 #include <time.h>
 
 int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
+static VALUE sym_exception, sym_wait_readable, sym_wait_writable;
 
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
 static VALUE rb_cAncillaryData;
@@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE data, vflags, dest_sockaddr;
     struct msghdr mh;
     struct iovec iov;
+    VALUE opts = Qnil;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
     VALUE controls = Qnil;
     VALUE controls_str = 0;
@@ -1151,9 +1153,10 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     if (argc == 0)
         rb_raise(rb_eArgError, "mesg argument required");
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
-    rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls);
+    rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls,
+                 &opts);
 #else
-    rb_scan_args(argc, argv, "12", &data, &vflags, &dest_sockaddr);
+    rb_scan_args(argc, argv, "12:", &data, &vflags, &dest_sockaddr, &opts);
 #endif
 
     StringValue(data);
@@ -1287,8 +1290,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-            rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block");
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
+		return sym_wait_writable;
+	    }
+	    rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE,
+				  "sendmsg(2) would block");
+	}
 	rb_sys_fail("sendmsg(2)");
     }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1342,7 +1350,7 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock)
 #if defined(HAVE_SENDMSG)
 /*
  * call-seq:
- *    basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls) => numbytes_sent
+ *    basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls, opts={}) => numbytes_sent
  *
  * sendmsg_nonblock sends a message using sendmsg(2) system call in non-blocking manner.
  *
@@ -1350,6 +1358,9 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock)
  * but the non-blocking flag is set before the system call
  * and it doesn't retry the system call.
  *
+ * By specifying `exception: false`, the _opts_ hash allows you to indicate
+ * that sendmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
  */
 VALUE
 rsock_bsock_sendmsg_nonblock(int argc, VALUE *argv, VALUE sock)
@@ -1608,8 +1619,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+            if (rsock_opt_false_p(vopts, sym_exception)) {
+                return sym_wait_readable;
+            }
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block");
+        }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
         if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) {
           /*
@@ -1794,6 +1809,9 @@ rsock_bsock_recvmsg(int argc, VALUE *argv, VALUE sock)
  * but non-blocking flag is set before the system call
  * and it doesn't retry the system call.
  *
+ * By specifying `exception: false`, the _opts_ hash allows you to indicate
+ * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
  */
 VALUE
 rsock_bsock_recvmsg_nonblock(int argc, VALUE *argv, VALUE sock)
@@ -1839,4 +1857,7 @@ rsock_init_ancdata(void)
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0);
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0);
 #endif
+    sym_exception = ID2SYM(rb_intern("exception"));
+    sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
+    sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
 }
diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c
index 5455977..4c79326 100644
--- a/ext/socket/basicsocket.c
+++ b/ext/socket/basicsocket.c
@@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- * 	basicsocket.recv_nonblock(maxlen) => mesg
- * 	basicsocket.recv_nonblock(maxlen, flags) => mesg
+ * 	basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg
  *
  * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	serv = TCPServer.new("127.0.0.1", 0)
@@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recv_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 455652d..5f0d445 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -188,9 +188,10 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
     long slen;
     int fd, flags;
     VALUE addr = Qnil;
+    VALUE opts = Qnil;
     socklen_t len0;
 
-    rb_scan_args(argc, argv, "11", &len, &flg);
+    rb_scan_args(argc, argv, "11:", &len, &flg, &opts);
 
     if (flg == Qnil) flags = 0;
     else             flags = NUM2INT(flg);
@@ -226,6 +227,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
+            if (rsock_opt_false_p(opts, sym_exception))
+		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block");
 	}
 	rb_sys_fail("recvfrom(2)");
@@ -528,14 +531,10 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr,
 			struct sockaddr *sockaddr, socklen_t *len)
 {
     int fd2;
-    int ex = 1;
     VALUE opts = Qnil;
 
     rb_scan_args(argc, argv, "0:", &opts);
 
-    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
-	ex = 0;
-
     rb_secure(3);
     rb_io_set_nonblock(fptr);
     fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1);
@@ -549,7 +548,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr,
 #if defined EPROTO
 	  case EPROTO:
 #endif
-            if (!ex)
+            if (rsock_opt_false_p(opts, sym_exception))
 		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "accept(2) would block");
 	}
diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h
index 359d28e..d03b1c5 100644
--- a/ext/socket/rubysocket.h
+++ b/ext/socket/rubysocket.h
@@ -423,4 +423,12 @@ static inline void rsock_maybe_wait_fd(int fd) { }
 #  define MSG_DONTWAIT_RELIABLE 0
 #endif
 
+static inline int
+rsock_opt_false_p(VALUE opt, VALUE sym)
+{
+    if (!NIL_P(opt) && Qfalse == rb_hash_lookup2(opt, sym, Qundef))
+	return 1;
+    return 0;
+}
+
 #endif
diff --git a/ext/socket/socket.c b/ext/socket/socket.c
index bb23703..35665e5 100644
--- a/ext/socket/socket.c
+++ b/ext/socket/socket.c
@@ -503,15 +503,13 @@ sock_connect_nonblock(int argc, VALUE *argv, VALUE sock)
     n = connect(fptr->fd, (struct sockaddr*)RSTRING_PTR(addr), RSTRING_SOCKLEN(addr));
     if (n < 0) {
         if (errno == EINPROGRESS) {
-            if (!NIL_P(opts) &&
-                    Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
                 return sym_wait_writable;
             }
             rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "connect(2) would block");
 	}
 	if (errno == EISCONN) {
-            if (!NIL_P(opts) &&
-                    Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
                 return INT2FIX(0);
             }
 	}
diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c
index eb605ca..86d18f4 100644
--- a/ext/socket/udpsocket.c
+++ b/ext/socket/udpsocket.c
@@ -194,8 +194,7 @@ udp_send(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- *   udpsocket.recvfrom_nonblock(maxlen) => [mesg, sender_inet_addr]
- *   udpsocket.recvfrom_nonblock(maxlen, flags) => [mesg, sender_inet_addr]
+ *   udpsocket.recvfrom_nonblock(maxlen [, flags [, options]) => [mesg, sender_inet_addr]
  *
  * Receives up to _maxlen_ bytes from +udpsocket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -211,6 +210,7 @@ udp_send(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	require 'socket'
@@ -238,6 +238,10 @@ udp_send(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recvfrom_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb
index b1959ee..316ec5f 100644
--- a/test/socket/test_nonblock.rb
+++ b/test/socket/test_nonblock.rb
@@ -1,6 +1,7 @@
 begin
   require "socket"
   require "io/nonblock"
+  require "io/wait"
 rescue LoadError
 end
 
@@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recvfrom_nonblock_no_exception
+    udp_pair do |s1, s2|
+      assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false)
+      s2.send("aaa", 0, s1.getsockname)
+      assert s1.wait_readable
+      mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false)
+      assert_equal(4, inet_addr.length)
+      assert_equal("aaa", mesg)
+    end
+  end
+
   if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET)
     def test_sendmsg_nonblock_seqpacket
       buf = '*' * 10000
@@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase
     rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
       skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
     end
+
+    def test_sendmsg_nonblock_no_exception
+      buf = '*' * 128
+      UNIXSocket.pair(:SEQPACKET) do |s1, s2|
+        n = 0
+        Timeout.timeout(60) do
+          case rv = s1.sendmsg_nonblock(buf, exception: false)
+          when Integer
+            n += rv
+          when :wait_writable
+            break
+          else
+            flunk "unexpected return value: #{rv.inspect}"
+          end while true
+          assert_equal :wait_writable, rv
+          assert_operator n, :>, 0
+        end
+      end
+    rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
+      skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
+    end
   end
 
   def test_recvmsg_nonblock_error
@@ -297,6 +330,7 @@ class TestSocketNonblock < Test::Unit::TestCase
       rescue Errno::EWOULDBLOCK
         assert_kind_of(IO::WaitReadable, $!)
       end
+      assert_equal :wait_readable, s1.recvmsg_nonblock(11, exception: false)
     }
   end
 
@@ -310,6 +344,16 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recv_nonblock_no_exception
+    tcp_pair {|c, s|
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+      s.write('HI')
+      assert c.wait_readable
+      assert_equal 'HI', c.recv_nonblock(11, exception: false)
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+    }
+  end
+
   def test_connect_nonblock_error
     serv = TCPServer.new("127.0.0.1", 0)
     _, port, _, _ = serv.addr
-- 
EW


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg
@ 2015-06-05 23:16 Eric Wong
  0 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-06-05 23:16 UTC (permalink / raw)
  To: spew

As documented before, exceptions are expensive and IO::Wait*able are too
common in socket applications to be the exceptional case.  Datagram
sockets deserve the same API which stream sockets are allowed with
read_nonblock and write_nonblock.

Note: this does not offer a performance advantage under optimal
conditions when both ends are equally matched in speed, but it it
does make debug output cleaner by avoiding exceptions whenever
the receiver slows down.
---
 ext/socket/ancdata.c         | 32 +++++++++++++++++++++++++++-----
 ext/socket/basicsocket.c     |  8 ++++++--
 ext/socket/init.c            | 11 +++++------
 ext/socket/rubysocket.h      |  8 ++++++++
 ext/socket/socket.c          |  6 ++----
 ext/socket/udpsocket.c       |  8 ++++++--
 test/socket/test_nonblock.rb | 44 ++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 756cf7e..767a0bc 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -3,6 +3,7 @@
 #include <time.h>
 
 int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
+static VALUE sym_exception, sym_wait_readable, sym_wait_writable;
 
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
 static VALUE rb_cAncillaryData;
@@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     VALUE data, vflags, dest_sockaddr;
     struct msghdr mh;
     struct iovec iov;
+    VALUE opts = Qnil;
     VALUE controls = Qnil;
     int controls_num;
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1152,7 +1154,8 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
     if (argc == 0)
         rb_raise(rb_eArgError, "mesg argument required");
 
-    rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls);
+    rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls,
+                 &opts);
 
     StringValue(data);
     controls_num = RARRAY_LENINT(controls);
@@ -1285,8 +1288,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-            rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block");
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
+		return sym_wait_writable;
+	    }
+	    rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE,
+				  "sendmsg(2) would block");
+	}
 	rb_sys_fail("sendmsg(2)");
     }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1340,7 +1348,7 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock)
 #if defined(HAVE_SENDMSG)
 /*
  * call-seq:
- *    basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls) => numbytes_sent
+ *    basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls, opts={}) => numbytes_sent
  *
  * sendmsg_nonblock sends a message using sendmsg(2) system call in non-blocking manner.
  *
@@ -1348,6 +1356,9 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock)
  * but the non-blocking flag is set before the system call
  * and it doesn't retry the system call.
  *
+ * By specifying `exception: false`, the _opts_ hash allows you to indicate
+ * that sendmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
  */
 VALUE
 rsock_bsock_sendmsg_nonblock(int argc, VALUE *argv, VALUE sock)
@@ -1606,8 +1617,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock)
             rb_io_check_closed(fptr);
             goto retry;
         }
-        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN))
+        if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) {
+            if (rsock_opt_false_p(vopts, sym_exception)) {
+                return sym_wait_readable;
+            }
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block");
+        }
 #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
         if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) {
           /*
@@ -1792,6 +1807,9 @@ rsock_bsock_recvmsg(int argc, VALUE *argv, VALUE sock)
  * but non-blocking flag is set before the system call
  * and it doesn't retry the system call.
  *
+ * By specifying `exception: false`, the _opts_ hash allows you to indicate
+ * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
  */
 VALUE
 rsock_bsock_recvmsg_nonblock(int argc, VALUE *argv, VALUE sock)
@@ -1837,4 +1855,8 @@ rsock_init_ancdata(void)
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0);
     rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0);
 #endif
+#undef rb_intern
+    sym_exception = ID2SYM(rb_intern("exception"));
+    sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
+    sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
 }
diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c
index 5455977..4c79326 100644
--- a/ext/socket/basicsocket.c
+++ b/ext/socket/basicsocket.c
@@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- * 	basicsocket.recv_nonblock(maxlen) => mesg
- * 	basicsocket.recv_nonblock(maxlen, flags) => mesg
+ * 	basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg
  *
  * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	serv = TCPServer.new("127.0.0.1", 0)
@@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recv_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 455652d..5f0d445 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -188,9 +188,10 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
     long slen;
     int fd, flags;
     VALUE addr = Qnil;
+    VALUE opts = Qnil;
     socklen_t len0;
 
-    rb_scan_args(argc, argv, "11", &len, &flg);
+    rb_scan_args(argc, argv, "11:", &len, &flg, &opts);
 
     if (flg == Qnil) flags = 0;
     else             flags = NUM2INT(flg);
@@ -226,6 +227,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
+            if (rsock_opt_false_p(opts, sym_exception))
+		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block");
 	}
 	rb_sys_fail("recvfrom(2)");
@@ -528,14 +531,10 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr,
 			struct sockaddr *sockaddr, socklen_t *len)
 {
     int fd2;
-    int ex = 1;
     VALUE opts = Qnil;
 
     rb_scan_args(argc, argv, "0:", &opts);
 
-    if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef))
-	ex = 0;
-
     rb_secure(3);
     rb_io_set_nonblock(fptr);
     fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1);
@@ -549,7 +548,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr,
 #if defined EPROTO
 	  case EPROTO:
 #endif
-            if (!ex)
+            if (rsock_opt_false_p(opts, sym_exception))
 		return sym_wait_readable;
             rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "accept(2) would block");
 	}
diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h
index 359d28e..d03b1c5 100644
--- a/ext/socket/rubysocket.h
+++ b/ext/socket/rubysocket.h
@@ -423,4 +423,12 @@ static inline void rsock_maybe_wait_fd(int fd) { }
 #  define MSG_DONTWAIT_RELIABLE 0
 #endif
 
+static inline int
+rsock_opt_false_p(VALUE opt, VALUE sym)
+{
+    if (!NIL_P(opt) && Qfalse == rb_hash_lookup2(opt, sym, Qundef))
+	return 1;
+    return 0;
+}
+
 #endif
diff --git a/ext/socket/socket.c b/ext/socket/socket.c
index bb23703..35665e5 100644
--- a/ext/socket/socket.c
+++ b/ext/socket/socket.c
@@ -503,15 +503,13 @@ sock_connect_nonblock(int argc, VALUE *argv, VALUE sock)
     n = connect(fptr->fd, (struct sockaddr*)RSTRING_PTR(addr), RSTRING_SOCKLEN(addr));
     if (n < 0) {
         if (errno == EINPROGRESS) {
-            if (!NIL_P(opts) &&
-                    Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
                 return sym_wait_writable;
             }
             rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "connect(2) would block");
 	}
 	if (errno == EISCONN) {
-            if (!NIL_P(opts) &&
-                    Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) {
+	    if (rsock_opt_false_p(opts, sym_exception)) {
                 return INT2FIX(0);
             }
 	}
diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c
index eb605ca..86d18f4 100644
--- a/ext/socket/udpsocket.c
+++ b/ext/socket/udpsocket.c
@@ -194,8 +194,7 @@ udp_send(int argc, VALUE *argv, VALUE sock)
 
 /*
  * call-seq:
- *   udpsocket.recvfrom_nonblock(maxlen) => [mesg, sender_inet_addr]
- *   udpsocket.recvfrom_nonblock(maxlen, flags) => [mesg, sender_inet_addr]
+ *   udpsocket.recvfrom_nonblock(maxlen [, flags [, options]) => [mesg, sender_inet_addr]
  *
  * Receives up to _maxlen_ bytes from +udpsocket+ using recvfrom(2) after
  * O_NONBLOCK is set for the underlying file descriptor.
@@ -211,6 +210,7 @@ udp_send(int argc, VALUE *argv, VALUE sock)
  * === Parameters
  * * +maxlen+ - the number of bytes to receive from the socket
  * * +flags+ - zero or more of the +MSG_+ options
+ * * +options+ - keyword hash, supporting `exception: false`
  *
  * === Example
  * 	require 'socket'
@@ -238,6 +238,10 @@ udp_send(int argc, VALUE *argv, VALUE sock)
  * it is extended by IO::WaitReadable.
  * So IO::WaitReadable can be used to rescue the exceptions for retrying recvfrom_nonblock.
  *
+ * By specifying `exception: false`, the options hash allows you to indicate
+ * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but
+ * return the symbol :wait_writable instead.
+ *
  * === See
  * * Socket#recvfrom
  */
diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb
index b1959ee..316ec5f 100644
--- a/test/socket/test_nonblock.rb
+++ b/test/socket/test_nonblock.rb
@@ -1,6 +1,7 @@
 begin
   require "socket"
   require "io/nonblock"
+  require "io/wait"
 rescue LoadError
 end
 
@@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recvfrom_nonblock_no_exception
+    udp_pair do |s1, s2|
+      assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false)
+      s2.send("aaa", 0, s1.getsockname)
+      assert s1.wait_readable
+      mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false)
+      assert_equal(4, inet_addr.length)
+      assert_equal("aaa", mesg)
+    end
+  end
+
   if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET)
     def test_sendmsg_nonblock_seqpacket
       buf = '*' * 10000
@@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase
     rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
       skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
     end
+
+    def test_sendmsg_nonblock_no_exception
+      buf = '*' * 128
+      UNIXSocket.pair(:SEQPACKET) do |s1, s2|
+        n = 0
+        Timeout.timeout(60) do
+          case rv = s1.sendmsg_nonblock(buf, exception: false)
+          when Integer
+            n += rv
+          when :wait_writable
+            break
+          else
+            flunk "unexpected return value: #{rv.inspect}"
+          end while true
+          assert_equal :wait_writable, rv
+          assert_operator n, :>, 0
+        end
+      end
+    rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT
+      skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}"
+    end
   end
 
   def test_recvmsg_nonblock_error
@@ -297,6 +330,7 @@ class TestSocketNonblock < Test::Unit::TestCase
       rescue Errno::EWOULDBLOCK
         assert_kind_of(IO::WaitReadable, $!)
       end
+      assert_equal :wait_readable, s1.recvmsg_nonblock(11, exception: false)
     }
   end
 
@@ -310,6 +344,16 @@ class TestSocketNonblock < Test::Unit::TestCase
     }
   end
 
+  def test_recv_nonblock_no_exception
+    tcp_pair {|c, s|
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+      s.write('HI')
+      assert c.wait_readable
+      assert_equal 'HI', c.recv_nonblock(11, exception: false)
+      assert_equal :wait_readable, c.recv_nonblock(11, exception: false)
+    }
+  end
+
   def test_connect_nonblock_error
     serv = TCPServer.new("127.0.0.1", 0)
     _, port, _, _ = serv.addr
-- 
EW


^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2015-06-05 23:16 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-06-05  0:01 [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg Eric Wong
  -- strict thread matches above, loose matches on Subject: below --
2015-06-05 23:16 Eric Wong
2015-06-04 22:08 Eric Wong
2015-06-04 22:07 Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).