TCP Socket read and write

B

Brian Schröder

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

Thank you,

Brian
 
Y

Yohanes Santoso

Brian Schröder said:
Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

Thank you,

Brian

First: Yes, they are thread safe. However, I think you still want to
synchronise access to the socket otherwise it is like two people
talking at once: confusion. Then again, this depends on the nature of
the protocol you are using (app-level protocol).

Second: you can use IO.select() to determine the readability (and
writeablity) of a socket. Or you can just use reading and sending
threads. Your choice.

YS.
 
B

Brian Schröder

Am Mon, 20 Sep 2004 16:31:53 +0900 schrieb Yohanes Santoso:
First: Yes, they are thread safe. However, I think you still want to
synchronise access to the socket otherwise it is like two people
talking at once: confusion. Then again, this depends on the nature of
the protocol you are using (app-level protocol).

Second: you can use IO.select() to determine the readability (and
writeablity) of a socket. Or you can just use reading and sending
threads. Your choice.

YS.

Hello Yohanes,

in the program only one thread at a time is reading, and only one at a
time is writing. I wanted to know if I could read and write to the same
socket from two different threads simultaneously. And I think this is a
subset of the answer, so I take it for "yes".

Thanks for the answer!

regards,

Brian
 
B

Brian Candler

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Not unless you have two threads which are talking on the same TCP socket at
once (which would be unusual). The generic TCP server skeleton I use looks
like this:

-------------------------------------------------------------------------
require 'socket'
module MyServer
def run
print "Hello, world!\r\n"
line = gets
print "You said #{line}\r\n"
end
end

port = ARGV[0] || 9876
bind = ARGV[1] || '0.0.0.0'
server = TCPserver.new(bind, port)

while s = server.accept
s.extend MyServer
Thread.new(s) do |session|
begin
session.run
rescue Exception => e
$stderr.puts "Caught exception: #{e}\n\t#{e.backtrace.join("\n\t")}"
ensure
session.close
end
end
end
-------------------------------------------------------------------------

A new thread is started for each session, and any local variables
instantiated in the 'run' method (like 'line' in this case) are separate for
each thread, so you don't have to worry about them interfering with each
other.

The only thing to beware of is the local variable 's' in the main loop. As
soon as the main loop goes back up to the top and a new connection is
accepted, 's' changes. That's why we pass 's' in as a parameter to
Thread.new, where it is copied into the block-local variable 'session' for
use later when we close the connection.

If you don't like the way I add a 'run' method to TCPSocket's singleton
class, you can always put the code in-line:

...
while s = server.accept
Thread.new(s) do |session|
begin
session.print "Hello, world!\r\n"
line = session.gets
session.print "You said #{line}\r\n"
rescue Exception => e
...
Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

I'm not really sure what you mean here. You read the socket by calling
TCPSocket#read or TCPSocket#gets. If you want to check whether there's data
available, then you can call select first - although that only guarantees
there's one byte available, so you'd have to do read(1) to guarantee that
you never blocked.

If you want to stream data out at the same time as streaming data in (rather
than lock-step command-response-command-response), then yes you'd use two
threads, one reading and one writing. As long as one only does 'reads' and
one only does 'writes', then you don't need to mutex them. But you may need
to signal between them so that if one side detects the socket has been
closed, the other terminates as well.

Regards,

Another Brian.
 
B

Bill Kelly

Hi,

From: "Brian Schröder said:
in the program only one thread at a time is reading, and only one at a
time is writing. I wanted to know if I could read and write to the same
socket from two different threads simultaneously. And I think this is a
subset of the answer, so I take it for "yes".

I didn't know if two threads simultaneously accessing the same
socket was legal, (I think I asked about this here a year or
two ago) .. so I'm doing a $sock.dup, the read-thread using one
and the write-thread using the other. I don't know if the
#dup is necessary or beneficial, actually. But for what it's
worth, here's how I'm using it:

http://bwk.homeip.net/ftp/dorkbuster/wallfly/buffered-io.rb

The above is an IO class whose read thread just slurps data
as fast as it can in the background. And whose write thread
writes data out, similarly, when it can. So the code interfacing
with this class gets a non-blocking read & write, with infinite
buffer size. (Up to available RAM of course.) For my purposes
it has been convenient... dunno if it would be useful to anyone
else. If so I could put it on RAA (?)

It's been stress tested with as many variations of reads, writes,
random & non random timing delays as I could think of to devise,
running for days under these stress conditions without a hiccup.
I realize that doesn't prove anything, but I have a good feeling
about its reliability at this point.

For what it's worth :)

Regards,

Bill
 
B

Brian Candler

Hi,



I didn't know if two threads simultaneously accessing the same
socket was legal

I believe it is, if you think about how Ruby implements threads internally.
It doesn't use any O/S threading at all (it even works under MS-DOS, not
that I've tried it myself :)

Rather, the Ruby interpreter chugs along the annotated syntax tree, changes
to another Thread, chugs along another big of syntax tree, and so on. For
threads which are blocked on I/O, it uses select() to determine when they
are ready to be scheduled again.
http://bwk.homeip.net/ftp/dorkbuster/wallfly/buffered-io.rb

The above is an IO class whose read thread just slurps data
as fast as it can in the background. And whose write thread
writes data out, similarly, when it can. So the code interfacing
with this class gets a non-blocking read & write, with infinite
buffer size. (Up to available RAM of course.) For my purposes
it has been convenient... dunno if it would be useful to anyone
else. If so I could put it on RAA (?)

You stuff data down one socket and read it back from the same socket, like a
loopback? If so I think it could be written much more simply; for example it
is superfluous to write

if select([@sock_rd], nil, nil, nil)
dat = @sock_rd.recv(65536)

when you can just do

dat = @sock_rd.recv(65536)

because Ruby handles the select() behind the scenes to prevent one thread
blocking another, as outlined above; recv deschedules the thread until at
least one byte is available.

In fact I think the core could be re-written as something like this:
-----------------------------------------------------------------------
require 'thread'

class BufferedIO
def initialize(sock)
@sock = sock
@Queue = Queue.new
@rd_thread = Thread.new { background_read }
@wr_thread = Thread.new { background_write }
end

def background_read
while true
dat = @sock.recv(65536)
break if dat.nil? or dat.empty?
@queue.push(dat)
end
@queue.push(nil) # EOF indication
end

def background_write
while dat = @queue.pop
@sock.write(dat)
end
end

def close
@sock.close
@rd_thread.join
@wr_thread.join
end
end
-----------------------------------------------------------------------

which looks a lot less like C and a lot more like Ruby :)

However that doesn't include your 'signal' functionality, nor do I have
access to your timed-wait.rb, so I can't prove it against your Unit tests.

Not sure how useful such a thing would be for RAA though. There is already
the "Queue" class in thread.rb, which is a queue of objects rather than a
queue of bytes, as I've used above. I think that's a more generic and useful
pattern. There is also a SizedQueue which limits the maximum number of
objects it contains.

Queue and SizedQueue are thread-safe, which is why there are no Mutexes in
the code above, although if paranoid you might want one to ensure that
@sock.read, @sock.write and @sock.close are mutually exclusive. (But then,
if @sock.write blocked, that would prevent @sock.read from running, which I
don't think is what you want)
It's been stress tested with as many variations of reads, writes,
random & non random timing delays as I could think of to devise,
running for days under these stress conditions without a hiccup.
I realize that doesn't prove anything, but I have a good feeling
about its reliability at this point.

That's definitely good :)

Regards,

Brian.
 
B

Bill Kelly

Hi Brian,

From: "Brian Candler said:
I believe it is, if you think about how Ruby implements threads internally.
It doesn't use any O/S threading at all (it even works under MS-DOS, not
that I've tried it myself :)

Ahh.. That does make sense...
You stuff data down one socket and read it back from the same socket, like a
loopback?

Oh.. hehe... No, it's intended to be connected to a remote
socket. (Or at least, to a socket whose other end is
connected to a different process.)

I suppose in practice it's a convenience mechanism. It lets
my main thread blast an arbitrarily large chunk of data at
the BufferedIO#send_nonblock without having to wait. The main
thread can go about its business, knowing that data will be
sent by BufferedIO to the remote host as fast as it can be.

And similarly, in the reverse direction, BufferedIO is
continually retrieving any data sent by the remote host
to a buffer, that will be available via BufferedIO#recv_nonblock,
whenever the main thread gets around to checking for it.
If so I think it could be written much more simply; for example it
is superfluous to write

if select([@sock_rd], nil, nil, nil)
dat = @sock_rd.recv(65536)

when you can just do

dat = @sock_rd.recv(65536)

because Ruby handles the select() behind the scenes to prevent one thread
blocking another, as outlined above; recv deschedules the thread until at
least one byte is available.

Good point, thanks !

In fact I think the core could be re-written as something like this:
-----------------------------------------------------------------------
require 'thread'

class BufferedIO
def initialize(sock)
@sock = sock
@queue = Queue.new
@rd_thread = Thread.new { background_read }
@wr_thread = Thread.new { background_write }
end

def background_read
while true
dat = @sock.recv(65536)
break if dat.nil? or dat.empty?
@queue.push(dat)
end
@queue.push(nil) # EOF indication
end

def background_write
while dat = @queue.pop
@sock.write(dat)
end
end

def close
@sock.close
@rd_thread.join
@wr_thread.join
end
end

Hmm... I like !
However that doesn't include your 'signal' functionality, nor do I have
access to your timed-wait.rb, so I can't prove it against your Unit tests.

Sorry, timed-wait.rb is there now.
Not sure how useful such a thing would be for RAA though. There is already
the "Queue" class in thread.rb, which is a queue of objects rather than a
queue of bytes, as I've used above. I think that's a more generic and useful
pattern. There is also a SizedQueue which limits the maximum number of
objects it contains.

Queue and SizedQueue are thread-safe, which is why there are no Mutexes in
the code above, although if paranoid you might want one to ensure that
@sock.read, @sock.write and @sock.close are mutually exclusive. (But then,
if @sock.write blocked, that would prevent @sock.read from running, which I
don't think is what you want)

Thanks!

Yes, I guess BufferedIO is somewhat like a Queue (of bytes) for
processes connected between sockets. Ultimately it's an experiment,
I'm interested to see whether it has a simplifying effect on my
"main thread" code, or not. Previously, my application was single-
threaded, and was a typical select() dispatch serving many clients.

I'm sort of injecting BufferedIO into this application to give me
the effect of (as though) arbitrarily large send/recv buffers in
the kernel. So my application is still in an equivalent of its
select() dispatch, only now waiting on this global-signal instead.
And able to toss arbitrarly large results of data at a client
without getting hung up on the transmit.

I don't know if in the end it will be beneficial, or if it
would be better to just rewrite the app to be completely
multi-threaded, and get rid of this main thread dispatch
notion entirely. I have always had a love-hate relationship
with both multithreading, and single-threaded select() dispatch
loops. <grin>


Thanks for your feedback & insights,


Regards,

Bill
 
B

Brian Candler

--ZGiS0Q5IWpPtfppv
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline

Oh.. hehe... No, it's intended to be connected to a remote
socket. (Or at least, to a socket whose other end is
connected to a different process.)

I suppose in practice it's a convenience mechanism. It lets
my main thread blast an arbitrarily large chunk of data at
the BufferedIO#send_nonblock without having to wait. The main
thread can go about its business, knowing that data will be
sent by BufferedIO to the remote host as fast as it can be.

Oh I see now.

Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.

My code fails your unit tests with some deadlock problem. I don't understand
what timed-wait is doing, so I can't really fix it. I've never found I've
had to use Thread.stop and the like; Ruby's primitives like Mutex, Queue,
SizedQueue, and Timeout always seem to do the job.

I think what I'm saying is: there's a more natural way to do it in Ruby.
I don't know if in the end it will be beneficial, or if it
would be better to just rewrite the app to be completely
multi-threaded, and get rid of this main thread dispatch
notion entirely. I have always had a love-hate relationship
with both multithreading, and single-threaded select() dispatch
loops. <grin>

For me, it's now a love-love relationship. I've seen programs which consist
of pages and pages of C, implementing select() across arrays of
filedescriptors and an array of state machines for each socket; then I've
rewritten it into a couple of screens of Ruby using Threads. The Ruby code
works far better, as it's less buggy :)

Regards,

Brian.

--ZGiS0Q5IWpPtfppv
Content-Type: text/plain; charset=us-ascii
Content-Disposition: attachment; filename="buffered-io2.rb"

require 'thread'
require 'global-signal'

class BufferedIO
def initialize(sock, data_ready_signal = nil)
@sock = sock
@rd_queue = Queue.new
@wr_queue = Queue.new
@data_ready_signal = data_ready_signal
@eof = false
@rd_thread = Thread.new { background_read }
@wr_thread = Thread.new { background_write }
end

def recv_nonblock
return "" if @rd_queue.empty?
@rd_queue.pop
end

def recv_ready?
not @rd_queue.empty?
end

def send_nonblock(str)
@wr_queue.push(str)
end

def write_pending?
not @wr.queue.empty?
end

def peeraddr
@sock.peeraddr
end

def eof
@eof and @rd_queue.empty?
end

def close
# flush queues??
@sock.close
@rd_thread.join
@wr_thread.join
end

protected
def background_read
while true
dat = @sock.recv(65536)
break if dat.nil? or dat.empty?
@rd_queue.push(dat)
@data_ready_signal.signal if @data_ready_signal
end
@eof = true
@rd_queue.push(nil)
end

def background_write
while dat = @wr_queue.pop
@sock.write(dat)
end
end
end

# Same Unit tests as before

if $0 == __FILE__

require 'test/unit'
require 'socket'

TEST_HOST = 'localhost'
TEST_PORT = 12345

class TestBufferedIO < Test::Unit::TestCase

def test_bio
server = TCPServer.new(TEST_PORT)
client = TCPSocket.new(TEST_HOST, TEST_PORT)
sv_client = server.accept
global_signal = GlobalSignal.new

bio = BufferedIO.new(client, global_signal)

assert( ! bio.eof )
assert( ! bio.recv_ready? )
assert_equal( "", bio.recv_nonblock )

# send some data... wait for signal...
sv_client.print("kazango!\n")
global_signal.wait
assert( bio.recv_ready? )
assert_equal( "kazango!\n", bio.recv_nonblock )
assert( ! bio.recv_ready? )

stress_beyond_kernel_buflen(bio, sv_client, global_signal)

# test eof:
sv_client.close
# - there's currently no way to wait for the eof condition to
# propagate from the background read thread :(
# ...so here's a cheezy sleep :(
sleep(1)
assert_equal( "", bio.recv_nonblock )
assert( bio.eof )

bio.close
server.close
end

def test_flush
server = TCPServer.new(TEST_PORT)
client = TCPSocket.new(TEST_HOST, TEST_PORT)
sv_client = server.accept
global_signal = GlobalSignal.new
bio = BufferedIO.new(client, global_signal)

# test flush on close (hopefully... kinda hard to test)
10.times { bio.send_nonblock("kazango!!!\n") }
bio.close
10.times { assert_equal( "kazango!!!\n", sv_client.gets ) }

sv_client.close
server.close
end

def stress_beyond_kernel_buflen(bio, sv_client, global_signal)
num_iter = 8192
bio_sent = ""
0.upto(num_iter) do |i|
$stderr.print "." if i%100 == 0
bio.send_nonblock "#{i}\n"
bio_sent << "#{i}\n"
sv_client.puts "#{i}"
end
bio_rcvd = ""
0.upto(num_iter) do |i|
$stderr.print "." if i%100 == 0
assert_equal( "#{i}\n", sv_client.gets )
bio_rcvd << bio.recv_nonblock
end
while bio_rcvd.length < bio_sent.length
global_signal.wait
bio_rcvd << bio.recv_nonblock
end
assert_equal( bio_sent, bio_rcvd )
end

end

end

--ZGiS0Q5IWpPtfppv--
 
B

Bill Kelly

Hi Brian,

From: "Brian Candler said:
Oh I see now.

Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.

The data_ready_signal passed into BufferedIO provides a means to
wait rather than poll. My application, which is currently structured
around a select() dispatch, uses global-signal instead, as in effect
a drop-in replacement for select().

So my main loop, instead of using select, like:

nready = select([@tcp_clients], nil, nil, timeout)

Has a semantically-similar, if you will, timed wait like:

begin
@global_signal.timed_wait(timeout)
rescue Timeout::Error

My code fails your unit tests with some deadlock problem. I don't understand
what timed-wait is doing, so I can't really fix it. I've never found I've
had to use Thread.stop and the like; Ruby's primitives like Mutex, Queue,
SizedQueue, and Timeout always seem to do the job.

I wasn't happy with how complex timed_wait was when I wrote it.
It was the culmination of a week-long learning experience, bug
hunt, and wild goose chase, coming to an understanding of why
ruby's timeout is hazardous to my health. Timeout will interrupt
your thread anywhere, even inside an ensure block. (This makes
it difficult if not impossible to use a timeout block on code
that internally uses ensure to release some critical resource.)
It all started when I tried to do a timeout around a condition
variable wait.

My quest started here:
Subject: How safe is 'timeout' ?
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/110306

And ended here with the timed_wait you see now:
Subject: Re: Request For Comments: exception safe ConditionVariable#wait
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/110799

I think what I'm saying is: there's a more natural way to do it in Ruby.

Yes, from the complexity of some of the resultant code, I seem
to be fighting the language to make this drop-in replacement for
select-with-timeout.

It seemed like a simple idea. I had a single-threaded app that
uses select() with a timeout. I thought my life would be simpler
if I could keep that structure, but just increase the buffer
size to infinity on my IO objects behind the scenes. . . . .
However, by this point, I could have long since rewritten the
app to be totally multithreaded. ;-/

However - should you ever need a ConditionVariable#wait with a
timeout--as my app does as it's *currently* structured--I haven't
yet found a simpler way to do it safely than that ugly timed_wait.
For me, it's now a love-love relationship. I've seen programs which consist
of pages and pages of C, implementing select() across arrays of
filedescriptors and an array of state machines for each socket; then I've
rewritten it into a couple of screens of Ruby using Threads. The Ruby code
works far better, as it's less buggy :)

I'm contemplating going ahead and thorougly restructuring my
app, around ruby threads and primitives like Queue, and see
how it turns out. I will avoid Timeout, however, unless I'm
certain the code within the timeout block makes no use of
ensure or otherwise has no need to manage any critical resources
or maintain class invariants, etc...


Thanks again for your thoughts,

Regards,

Bill
 
B

Brian Candler

Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.

The data_ready_signal passed into BufferedIO provides a means to
wait rather than poll. My application, which is currently structured
around a select() dispatch, uses global-signal instead, as in effect
a drop-in replacement for select().

So my main loop, instead of using select, like:

nready = select([@tcp_clients], nil, nil, timeout)

Has a semantically-similar, if you will, timed wait like:

begin
@global_signal.timed_wait(timeout)
rescue Timeout::Error

Ah, I see now.

Perhaps what would be nice here is a version of Queue#pop with a timeout.
Then you don't need two separate objects, one to signal "data ready" and one
to carry the data itself.
Timeout will interrupt
your thread anywhere, even inside an ensure block. (This makes
it difficult if not impossible to use a timeout block on code
that internally uses ensure to release some critical resource.)

Yes, I see the problem. It's hard to see a general-purpose solution to this;
after all, a timeout has no way of knowing whether you're in a short-lived
"ensure" block, or something which has gone wrong and really does need to be
aborted (such as an infinite loop within your "ensure" block)

However, specific solutions should be doable. Here's a first stab at a
Queue#pop_timeout, see if this looks reasonable to you:

require 'thread'
require 'timeout'
class Queue
def pop_timeout(secs, e=Timeout::Error)
me = Thread.current
timedout = false
timer = Thread.new { sleep(secs); timedout=true; me.wakeup }
while (Thread.critical = true; @que.empty?)
raise e if timedout
@waiting.push Thread.current
Thread.stop
end
@que.shift
ensure
timer.kill if timer and timer.alive?
Thread.critical=false
end
end

if __FILE__ == $0
queue = Queue.new
Thread.new { queue.push("hello"); sleep(1); queue.push("world") }
while true
a = queue.pop_timeout(3)
p a
end
end
It all started when I tried to do a timeout around a condition
variable wait. ...
However - should you ever need a ConditionVariable#wait with a
timeout--as my app does as it's *currently* structured--I haven't
yet found a simpler way to do it safely than that ugly timed_wait.

Perhaps I've missed something, but I don't see why
ConditionVariable#wait_timeout can't be implemented safely. Unlike the
'timeout' module, you don't need to have the timeout thread itself raise an
exception; it can just set a flag.

This code is untested but is just to sketch a solution:

class ConditionVariable
def wait_timeout(mutex, timeout=nil, e=Timeout::Error)
timedout = false
if timeout
me = Thread.current
timer = Thread.new { sleep(timeout); timedout=true; me.wakeup }
end
mutex.exclusive_unlock do
@waiters.push(Thread.current)
Thread.stop
end
raise e if timedout
mutex.lock
ensure
timer.kill if timer and timer.alive?
end
end

It's basically just ConditionVariable#wait copied from thread.rb, with an
extra "raise e if timedout" at the end. Comments?

Cheers,

Brian.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
474,159
Messages
2,570,879
Members
47,414
Latest member
GayleWedel

Latest Threads

Top