dtas.git  about / heads / tags
duct tape audio suite for *nix
blob e53645928b0b5422f4f1985f0bfd7b9afa388b26 1928 bytes (raw)
$ git show mpd:test/test_mpd_emu.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
 
# Copyright (C) 2013-2016 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require_relative 'helper'
require 'socket'
require 'dtas/mpd_emu_client'
require 'dtas/server_loop'

class TestMpdEmu < Testcase
  class Quit
    attr_reader :to_io

    def initialize
      @to_io, @w = IO.pipe
    end

    def quit!
      @w.close
    end

    def accept_nonblock(*args)
      Thread.exit
    end
  end

  def setup
    @host = '127.0.0.1'
    @l = TCPServer.new(@host, 0)
    @port = @l.addr[1]
    @quit = Quit.new
    @klass = Class.new(DTAS::MpdEmuClient)
    @svc = DTAS::ServerLoop.new([@l, @quit], @klass)
    @th = Thread.new { @svc.run_forever }
    @c = TCPSocket.new(@host, @port)
    assert_match %r{\AOK.*MPD.*\n\z}, @c.gets
  end

  def teardown
    @quit.quit!
    @th.join
    @quit.to_io.close
    @l.close
    @c.close unless @c.closed?
  end

  def test_ping
    @c.write "ping\n"
    assert_equal "OK\n", @c.gets
    assert_nil IO.select([@c], nil, nil, 0)
    @c.write "close\n"
    assert_nil @c.read(1)
  end

  def test_bad_list_states
    %w(command_list_end).each do |cmd|
      @c.write "#{cmd}\n"
      assert_equal %Q![5@0 {} unknown command "#{cmd}"\n!, @c.gets
      assert_nil IO.select([@c], nil, nil, 0)
    end
  end

  # to ensure output buffering works:
  module BigOutput
    WAKE = IO.pipe
    NR = 20000
    OMG = ('OMG! ' * 99) << "OMG!\n"
    def mpdcmd_big_output(*_)
      rv = true
      NR.times { rv = out(OMG) }
      # tell the tester we're done writing to our buffer:
      WAKE[1].write(rv == :wait_writable ? '.' : 'F')
      rv
    end
  end

  def test_big_output
    @klass.__send__(:include, BigOutput)
    @c.write "big_output\n"
    assert_equal '.', BigOutput::WAKE[0].read(1), 'server blocked on write'
    BigOutput::NR.times do
      assert_equal BigOutput::OMG, @c.gets
    end
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git