--ZGiS0Q5IWpPtfppv
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
Oh.. hehe... No, it's intended to be connected to a remote
socket. (Or at least, to a socket whose other end is
connected to a different process.)
I suppose in practice it's a convenience mechanism. It lets
my main thread blast an arbitrarily large chunk of data at
the BufferedIO#send_nonblock without having to wait. The main
thread can go about its business, knowing that data will be
sent by BufferedIO to the remote host as fast as it can be.
Oh I see now.
Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.
My code fails your unit tests with some deadlock problem. I don't understand
what timed-wait is doing, so I can't really fix it. I've never found I've
had to use Thread.stop and the like; Ruby's primitives like Mutex, Queue,
SizedQueue, and Timeout always seem to do the job.
I think what I'm saying is: there's a more natural way to do it in Ruby.
I don't know if in the end it will be beneficial, or if it
would be better to just rewrite the app to be completely
multi-threaded, and get rid of this main thread dispatch
notion entirely. I have always had a love-hate relationship
with both multithreading, and single-threaded select() dispatch
loops. <grin>
For me, it's now a love-love relationship. I've seen programs which consist
of pages and pages of C, implementing select() across arrays of
filedescriptors and an array of state machines for each socket; then I've
rewritten it into a couple of screens of Ruby using Threads. The Ruby code
works far better, as it's less buggy
Regards,
Brian.
--ZGiS0Q5IWpPtfppv
Content-Type: text/plain; charset=us-ascii
Content-Disposition: attachment; filename="buffered-io2.rb"
require 'thread'
require 'global-signal'
class BufferedIO
def initialize(sock, data_ready_signal = nil)
@sock = sock
@rd_queue = Queue.new
@wr_queue = Queue.new
@data_ready_signal = data_ready_signal
@eof = false
@rd_thread = Thread.new { background_read }
@wr_thread = Thread.new { background_write }
end
def recv_nonblock
return "" if @rd_queue.empty?
@rd_queue.pop
end
def recv_ready?
not @rd_queue.empty?
end
def send_nonblock(str)
@wr_queue.push(str)
end
def write_pending?
not @wr.queue.empty?
end
def peeraddr
@sock.peeraddr
end
def eof
@eof and @rd_queue.empty?
end
def close
# flush queues??
@sock.close
@rd_thread.join
@wr_thread.join
end
protected
def background_read
while true
dat = @sock.recv(65536)
break if dat.nil? or dat.empty?
@rd_queue.push(dat)
@data_ready_signal.signal if @data_ready_signal
end
@eof = true
@rd_queue.push(nil)
end
def background_write
while dat = @wr_queue.pop
@sock.write(dat)
end
end
end
# Same Unit tests as before
if $0 == __FILE__
require 'test/unit'
require 'socket'
TEST_HOST = 'localhost'
TEST_PORT = 12345
class TestBufferedIO < Test::Unit::TestCase
def test_bio
server = TCPServer.new(TEST_PORT)
client = TCPSocket.new(TEST_HOST, TEST_PORT)
sv_client = server.accept
global_signal = GlobalSignal.new
bio = BufferedIO.new(client, global_signal)
assert( ! bio.eof )
assert( ! bio.recv_ready? )
assert_equal( "", bio.recv_nonblock )
# send some data... wait for signal...
sv_client.print("kazango!\n")
global_signal.wait
assert( bio.recv_ready? )
assert_equal( "kazango!\n", bio.recv_nonblock )
assert( ! bio.recv_ready? )
stress_beyond_kernel_buflen(bio, sv_client, global_signal)
# test eof:
sv_client.close
# - there's currently no way to wait for the eof condition to
# propagate from the background read thread
# ...so here's a cheezy sleep
sleep(1)
assert_equal( "", bio.recv_nonblock )
assert( bio.eof )
bio.close
server.close
end
def test_flush
server = TCPServer.new(TEST_PORT)
client = TCPSocket.new(TEST_HOST, TEST_PORT)
sv_client = server.accept
global_signal = GlobalSignal.new
bio = BufferedIO.new(client, global_signal)
# test flush on close (hopefully... kinda hard to test)
10.times { bio.send_nonblock("kazango!!!\n") }
bio.close
10.times { assert_equal( "kazango!!!\n", sv_client.gets ) }
sv_client.close
server.close
end
def stress_beyond_kernel_buflen(bio, sv_client, global_signal)
num_iter = 8192
bio_sent = ""
0.upto(num_iter) do |i|
$stderr.print "." if i%100 == 0
bio.send_nonblock "#{i}\n"
bio_sent << "#{i}\n"
sv_client.puts "#{i}"
end
bio_rcvd = ""
0.upto(num_iter) do |i|
$stderr.print "." if i%100 == 0
assert_equal( "#{i}\n", sv_client.gets )
bio_rcvd << bio.recv_nonblock
end
while bio_rcvd.length < bio_sent.length
global_signal.wait
bio_rcvd << bio.recv_nonblock
end
assert_equal( bio_sent, bio_rcvd )
end
end
end
--ZGiS0Q5IWpPtfppv--