about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/dtas/buffer/splice.rb4
-rw-r--r--lib/dtas/player.rb8
-rw-r--r--lib/dtas/writable_iter.rb16
-rw-r--r--test/test_buffer.rb5
4 files changed, 32 insertions, 1 deletions
diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb
index 9093860..2a7e9a4 100644
--- a/lib/dtas/buffer/splice.rb
+++ b/lib/dtas/buffer/splice.rb
@@ -69,7 +69,9 @@ module DTAS::Buffer::Splice # :nodoc:
   end
 
   def broadcast_inf(targets)
-    if targets.none?(&:nonblock?)
+    if targets.all?(&:ready_write_optimized?)
+      blocked = []
+    elsif targets.none?(&:nonblock?)
       # if all targets are blocking, don't start until they're all writable
       r = IO.select(nil, targets, nil, 0) or return targets
       blocked = targets - r[1]
diff --git a/lib/dtas/player.rb b/lib/dtas/player.rb
index 56fbdac..04c03b0 100644
--- a/lib/dtas/player.rb
+++ b/lib/dtas/player.rb
@@ -300,6 +300,13 @@ class DTAS::Player # :nodoc:
     end
   end
 
+  def _optimize_write_prepare(targets)
+    targets.each do |dst|
+      dst.wait_writable_prepare
+      @srv.wait_ctl(dst, :wait_writable)
+    end
+  end
+
   # returns a wait_ctl arg for self
   def broadcast_iter(buf, targets)
     case rv = buf.broadcast(targets)
@@ -315,6 +322,7 @@ class DTAS::Player # :nodoc:
       # via DTAS::Sink#writable_iter
       :ignore
     else # :wait_readable or nil
+      _optimize_write_prepare(targets)
       rv
     end
   end
diff --git a/lib/dtas/writable_iter.rb b/lib/dtas/writable_iter.rb
index 1fd65eb..2355352 100644
--- a/lib/dtas/writable_iter.rb
+++ b/lib/dtas/writable_iter.rb
@@ -4,9 +4,24 @@ require_relative '../dtas'
 
 module DTAS::WritableIter # :nodoc:
   attr_accessor :on_writable
+  # we may use the ready_write flag to avoid an extra IO.select
+  attr_accessor :ready_write
 
   def writable_iter_init
+    @mark_writable = proc { @ready_write = true }
     @on_writable = nil
+    @ready_write = true
+  end
+
+  def ready_write_optimized?
+    rv = @ready_write
+    @ready_write = false
+    rv
+  end
+
+  def wait_writable_prepare
+    @ready_write = false
+    @on_writable ||= @mark_writable
   end
 
   # this is used to exchange our own writable status for the readable
@@ -14,6 +29,7 @@ module DTAS::WritableIter # :nodoc:
   def writable_iter
     if owr = @on_writable
       @on_writable = nil
+      @ready_write = true
       owr.call # this triggers readability watching of DTAS::Buffer
     end
     :ignore
diff --git a/test/test_buffer.rb b/test/test_buffer.rb
index 13f4352..d41d07c 100644
--- a/test/test_buffer.rb
+++ b/test/test_buffer.rb
@@ -15,6 +15,11 @@ class TestBuffer < Testcase
 
   def pipe
     ret = IO.pipe
+    ret.each do |x|
+      def x.ready_write_optimized?
+        false
+      end
+    end
     @to_close.concat(ret)
     ret
   end