Thread.list confusion

A

Andrew S. Townley

Still in pursuit of trying to figure out what is going on with my
threads...

Have done the following:

3 class Thread
4 alias __old_initialize initialize
5
6 def initialize(*args, &block)
7 __old_initialize(*args, &block)
8 STDERR.puts "created thread: #{inspect}"
9 STDERR.puts block.to_s
10 end
11 end

This is above any require 'blah' statements so, this should ensure that
any threads created use my hacked version, right?

Maybe I *really* don't understand what's happening here (highly likely),
but my assumption is that I should have an equal number of threads going
through my code as appear in the Thread.list. However, based on my
current numbers, I have 13 calls to my initialize (one of these is from
drb, so it must be doing what I think). However, the Thread list
contains 33 entries (32 sleeping & 1 running).

I've tried various instrumentation stuff, including things like:

#set_trace_func proc { |event, file, line, id, binding, classname|
# if classname.to_s =~ /read/
# printf("%-10s %10s:%-2d %10s %8s\n", event, file, line, id, classname)
# STDOUT.flush
# end
#}

But, I really can't figure out what is happening. From what my code is
doing, I would expect there to be 13 threads created (counting the main
thread). This perfectly matches up with the number of calls to my
hacked Thread class. Anyone have any ideas?

Also, I've looked at this from a number of different angles, but it
appears to be no way to create a normal thread pool with Ruby. Is this
really correct? I found something about this on www.rubygarden.org, but
this isn't exactly what I want (but it is similar in spirit). From
experimentation, it seems that you get one shot for Thread.new. This
seems a bit expensive, but maybe that's just the way it is.

Thanks in advance,

ast

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************
 
E

Eric Hodel

Still in pursuit of trying to figure out what is going on with my
threads...

Have done the following:

3 class Thread
4 alias __old_initialize initialize
5
6 def initialize(*args, &block)
7 __old_initialize(*args, &block)
8 STDERR.puts "created thread: #{inspect}"
9 STDERR.puts block.to_s
10 end
11 end

This is above any require 'blah' statements so, this should ensure
that
any threads created use my hacked version, right?
Yes

Maybe I *really* don't understand what's happening here (highly
likely),
but my assumption is that I should have an equal number of threads
going
through my code as appear in the Thread.list. However, based on my
current numbers, I have 13 calls to my initialize (one of these is
from
drb, so it must be doing what I think). However, the Thread list
contains 33 entries (32 sleeping & 1 running).

ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
thread you spawn so you can see who is spawning the extra threads.

DRb spawns a thread per connection. To see DRb's threads, do:

drb = DRb.start_service
drb.instance_variable_get('@grp').list
Also, I've looked at this from a number of different angles, but it
appears to be no way to create a normal thread pool with Ruby. Is
this
really correct? I found something about this on
www.rubygarden.org, but
this isn't exactly what I want (but it is similar in spirit). From
experimentation, it seems that you get one shot for Thread.new. This
seems a bit expensive, but maybe that's just the way it is.

What do you want?
 
A

Andrew S. Townley

Hi Eric

On 29 Aug 2005, at 08:57, Andrew S. Townley wrote: [snip]
ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
thread you spawn so you can see who is spawning the extra threads.

Hmmm... I think I might use a ThreadGroup, no? ;)

Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...
What do you want?

Something like this (actually, it was straightforward enough once I
thought about it a little):

$ cat tpool.rb
require 'thread'

class ThreadPool
def initialize(size)
@work = Queue.new
@workers = []
@group = ThreadGroup.new
@shutdown = false
@sh_mutex = Mutex.new
size.times do
@workers << t = Thread.new { Thread.stop; thread_work };
@group.add(t)
end
@monitor = Thread.new do
Thread.stop
loop do
@sh_mutex.synchronize { Thread.current.terminate if @shutdown }
sleep(1)
end
end
end

def <<(runnable)
@work << runnable
self
end

def thread_work
loop do
@sh_mutex.synchronize do
if @shutdown
puts "#{Thread.current} stopping";
Thread.current.terminate
end
end
puts "#{Thread.current.inspect} is one of #{@work.num_waiting} waiting for work"
job = @work.deq
begin
job.run if job != nil
Thread.pass
rescue => e
puts e
next
end
end
end

def start
@workers.each { |w| w.run }
@monitor.run
end

def join
@monitor.join
end

def shutdown(wait = true)
@sh_mutex.synchronize { @shutdown = true }
@workers.each { |w| w.join if w.alive? } if wait
end

attr_reader :group
end

class Runnable
def initialize(*args, &block)
@block = block
end

def run
@block.call
end
end

pool = ThreadPool.new(8)

pool.start
job1 = Runnable.new do
3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end

vagrant = Runnable.new { raise "broken" }

pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1

Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t); pool.shutdown(false) }
pool.join
pool.shutdown

puts "Thread group"
pool.group.list.each { |w| puts w.inspect }

puts "Thread.list"
Thread.list.each { |w| puts w.inspect }


***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************
 
E

Eric Hodel

Hi Eric

On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:
[snip]

ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
thread you spawn so you can see who is spawning the extra threads.

Hmmm... I think I might use a ThreadGroup, no? ;)

Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...
What do you want?

Something like this (actually, it was straightforward enough once I
thought about it a little):

$ cat tpool.rb
require 'thread'

class ThreadPool
def initialize(size)
@work = Queue.new
# @workers = []

You don't need to keep threads in an Array because you don't keep
track of their status or #value. (Its better to just push a value
onto a Queue than to check #value because you can be sure you're
always getting something that is valid.)
@group = ThreadGroup.new
@shutdown = false
@sh_mutex = Mutex.new

I don't think this mutex protects anything, assignment of a constant
will be atomic.
size.times do
Thread.new { @group.add Thread.current; thread.stop;
thread_work }
end
@monitor = Thread.new do
Thread.stop
loop do
@sh_mutex.synchronize { Thread.current.terminate if
@shutdown }
sleep(1)
end
end
end

def <<(runnable)
@work << runnable
self
end

def thread_work
loop do
@sh_mutex.synchronize do
if @shutdown
puts "#{Thread.current} stopping";
Thread.current.terminate
end
end
puts "#{Thread.current.inspect} is one of #
{@work.num_waiting} waiting for work"
job = @work.deq
begin
job.run if job != nil
Thread.pass
rescue => e
puts e
next
end
end
end

def start @group.list.each { |w| w.run }
@monitor.run
end

def join
@monitor.join
end

def shutdown(wait = true) @group.enclose
@sh_mutex.synchronize { @shutdown = true }
@group.list.first.join until @group.list.empty? if wait
end

attr_reader :group
end

class Runnable
def initialize(*args, &block)
@block = block
end

def run
@block.call
end
end

pool = ThreadPool.new(8)

pool.start
job1 = Runnable.new do
3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end

vagrant = Runnable.new { raise "broken" }

pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1

Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t);
pool.shutdown(false) }
pool.join
pool.shutdown

puts "Thread group"
pool.group.list.each { |w| puts w.inspect }

The group should always be empty after shutdown if you waited. A
ThreadGroup does not hold dead threads.
 
E

Eric Hodel

On Tue, 2005-08-30 at 19:05, Eric Hodel wrote:
[lots of really useful feedback deleted]

Thanks for the feedback, Eric. I'd actually made some other
changes to
it prior to getting your comments. I have to say, looking at the
difference, I can see some value in the ThreadGroup, but I don't
really
get why you're so excited about it... this is for another day, I
think.

In other news, I found the mystery multiplying threads and have a
problem that I'm not quite sure how to solve with the tools at hand...
Maybe someone out there can help, but maybe the answer is to just roll
my own something-or-other again.

The problem came from two seemingly benign blocks of code:

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
begin
Timeout::timeout(timeout) do
puts("READ: #{@queue.length} elements; #
{@queue.num_waiting} threads waiting.")
return @queue.deq
end
rescue Timeout::Error
puts("TIMEOUT: #{@queue.length} elements; #
{@queue.num_waiting} threads waiting.")
end
end

and this (/usr/lib/ruby/1.8/thread.rb):

280 @waiting.push Thread.current

because of this (/usr/lib/ruby/1.8/timeout.rb):

40 y = Thread.start {
41 sleep sec
42 x.raise exception, "execution expired" if x.alive?
43 }
46 ensure
47 y.kill if y and y.alive?

The key line is 280. The problem seems to come from @waiting being an
array which holds on to references to the threads created in line 40
(which is also why I couldn't find it because I was tracing for
Thread#new and not Thread#start doh!!). Even though the thread
seems to
definitely be killed in line 47, the array still holds a reference to
it, so I'm guessing that like Java, this prevents garbage
collection for
a while. Therefore, when I was looking at the list, the threads
created
here were still in it.

Yes. A ThreadGroup doesn't have the GC problem, but would not be
suitable here. (A Thread can only belong to one ThreadGroup, and
Queue shouldn't reorganize threads it didn't create.)
I'm a bit worried about this block in thread.rb, though:

257 t = @waiting.shift
258 t.wakeup if t

because I really don't like what I've observed happening in #257/8
here. Based on what I'm doing, the waiting array will eventually get
huge... and I mean, HUGE. Also, based on further experiments, adding
items to the queue will reduce @waiting.length by n, however there
will
be a lot more attempted reads than there will attempted writes to the
queue.

More job runners than jobs?
I'm open to suggestions, but I can't just do it without the timer
because I need to get control back every n seconds so I can do things
like graceful shutdown, etc.

If you need safe concurrent access like a Queue and timeouts you may
find rinda/tuplespace.rb useful. Somewhere around here I have a
stream implementation for it (but its not terribly difficult to write
from scratch). I think it can be modified to have timeouts on pop.

(I have RDoc patches for rinda out for review.)
Here's the full test program (not out to win any style awards with the
calls to read, btw) :)

Give me a bit and I think I can make your test program with with a
TupleSpace streams.
 
L

Lyndon Samson

------=_Part_4222_11766571.1125528987314
Content-Type: text/plain; charset=ISO-8859-1
Content-Transfer-Encoding: quoted-printable
Content-Disposition: inline

With all the talk about Muds I was hoping the Subject was referring to a=20
Roguelike game in Ruby.. :)

------=_Part_4222_11766571.1125528987314--
 
E

Eric Hodel

--Apple-Mail-14--943503632
Content-Transfer-Encoding: 7bit
Content-Type: text/plain;
charset=US-ASCII;
delsp=yes;
format=flowed

If you need safe concurrent access like a Queue and timeouts you
may find rinda/tuplespace.rb useful. Somewhere around here I have
a stream implementation for it (but its not terribly difficult to
write from scratch). I think it can be modified to have timeouts
on pop.


Give me a bit and I think I can make your test program with with a
TupleSpace streams.

$ cat test.rb
require 'ts_stream'

ts = Rinda::TupleSpace.new 1
stream = Rinda::Stream.new ts, 1

def read(stream, timeout)
puts "READ: #{stream.length} elements"
puts "*** #{stream.pop timeout} ***"
rescue Rinda::Stream::ClosedError
puts "CLOSED: #{stream.length} elements"
rescue Rinda::Stream::TimeoutError
puts "TIMEOUT: #{stream.length} elements"
end

6.times { read stream, 1 }
stream.push "one"
6.times { read stream, 1 }

p Thread.list
$ ruby test.rb
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 1 elements
*** one ***
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
[#<Thread:0x33898c sleep>, #<Thread:0x1c1740 run>]


--Apple-Mail-14--943503632
Content-Transfer-Encoding: 7bit
Content-Type: text/x-ruby-script;
x-unix-mode=0644;
name="ts_stream.rb"
Content-Disposition: attachment;
filename=ts_stream.rb

require 'rinda/tuplespace'

##
# streams will be stored as [:stream, stream_index, position, value]

class Rinda::Stream

##
# Raised when attempting to pop or push onto a closed Stream.

class ClosedError < RuntimeError; end

##
# Raised when a Stream operation times out.

class TimeoutError < RuntimeError; end

##
# The number of this stream.

attr_reader :stream_id

##
# Creates a new stream on +ts+

def initialize(ts, stream_id)
@ts = ts
@stream_id = stream_id

begin
head_tup = @ts.read [:stream, @stream_id, :head, nil], true
rescue Rinda::RequestExpiredError
@ts.write [:stream, @stream_id, :head, 0]
@ts.write [:stream, @stream_id, :tail, 0]
end
end

def length
tail_index = @ts.read([:stream, @stream_id, :tail, nil]).last
head_index = @ts.read([:stream, @stream_id, :head, nil]).last
length = tail_index - head_index

return length < 0 ? 0 : length
end

def push(value)
index = @ts.take([:stream, @stream_id, :tail, nil]).last

if index.nil? then
@ts.write [:stream, @stream_id, :tail, nil]
raise ClosedError
end

@ts.write [:stream, @stream_id, :tail, index + 1]
@ts.write [:stream, @stream_id, index, value]

return value
end

##
# Removes the oldest value from the stream. +sec+ can be a time-to-wait in
# seconds or a Renewer object (see Rinda documentation). #pop can wait up
# to twice the value of +sec+.

def pop(sec = nil)
# Find the head
begin
index = @ts.take([:stream, @stream_id, :head, nil], sec).last
rescue Rinda::RequestExpiredError
raise TimeoutError
end

# Check if the stream is closed
begin
last = @ts.read [:stream, @stream_id, :closed, nil], true
rescue Rinda::RequestExpiredError
last = nil
else
last = last.last
end

if index == last then
@ts.write [:stream, @stream_id, :head, index]
raise ClosedError
end

# Grab our value
begin
value = @ts.take([:stream, @stream_id, index, nil], sec).last
rescue Rinda::RequestExpiredError
@ts.write [:stream, @stream_id, :head, index]
raise TimeoutError
else
@ts.write [:stream, @stream_id, :head, index + 1]
end

return value
end

##
# Closes the stream for any further writing. Reading may continue until the
# stream is empty. An already blocket #get will remain blocked.

def close
index = @ts.take([:stream, @stream_id, :tail, nil]).last
@ts.write [:stream, @stream_id, :tail, nil]
@ts.write [:stream, @stream_id, :closed, index]

return nil
end

end

if __FILE__ == $0 then
require 'test/unit'

class Rinda::StreamTest < Test::Unit::TestCase

def setup
@ts = Rinda::TupleSpace.new 1
@stream = Rinda::Stream.new @ts, 0
end

def test_create
assert @stream, "Stream must not be nil"
end

def test_put_get
@stream.push 0
@stream.push 1
assert 0, @stream.pop
assert 1, @stream.pop
end

def test_length
assert_equal 0, @stream.length

@stream.push 0
assert_equal 1, @stream.length

@stream.push 1
assert_equal 2, @stream.length

assert 0, @stream.pop
assert_equal 1, @stream.length

assert 1, @stream.pop
assert_equal 0, @stream.length

begin
@stream.pop true
rescue Rinda::Stream::TimeoutError
end

assert_equal 0, @stream.length
end

def test_close
@stream.push 0
@stream.close
assert_raises Rinda::Stream::ClosedError do
@stream.push 1
end
assert_equal 0, @stream.pop, "Finish reading from the stream"
assert_raises Rinda::Stream::ClosedError do
@stream.pop
end
end

def test_pop_timeout
assert_raises Rinda::Stream::TimeoutError do
@stream.pop true
end
assert_raises Rinda::Stream::TimeoutError do
@stream.pop 1
end

@stream.push :value
assert_nothing_raised do
assert_equal :value, @stream.pop(1)
end
end

def test_concurrency_but_not_really
push_threads = []
pop_threads = []
value_one = nil
value_two = nil

push_threads << Thread.start do
@stream.push 0
end

push_threads << Thread.start do
@stream.push 1
end

pop_threads << Thread.start do
value_one = @stream.pop
end

pop_threads << Thread.start do
value_two = @stream.pop
end

push_threads.each do |t| t.join end
@stream.close
pop_threads.each do |t| t.join end

assert_equal [0, 1], [value_one, value_two].sort
end

end
end


--Apple-Mail-14--943503632
Content-Transfer-Encoding: 7bit
Content-Type: text/plain;
charset=US-ASCII;
format=flowed


--
Eric Hodel - (e-mail address removed) - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04


--Apple-Mail-14--943503632--
 
C

Caleb Clausen

Lyndon said:
With all the talk about Muds I was hoping the Subject was referring to a
Roguelike game in Ruby.. :)

I'm with you, brother! What say we write one; we can use dwemthy as a core.=
.. ;)
 
E

Eric Hodel

Even better with renewer objects.

require 'ts_stream'

ts = Rinda::TupleSpace.new 1
stream = Rinda::Stream.new ts, 1

class Renewer
attr_accessor :shutdown
def initialize() @shutdown = false end
def renew() return !@shutdown end
end

renewer = Renewer.new

def read(stream, timeout)
puts "READ: #{stream.length} elements"
puts "*** #{stream.pop timeout} ***"
rescue Rinda::Stream::ClosedError
puts "CLOSED: #{stream.length} elements"
rescue Rinda::Stream::TimeoutError
puts "TIMEOUT: #{stream.length} elements"
end

6.times { Thread.start do read stream, renewer end }
stream.push "one"

puts "#{Thread.list.length} live threads"

renewer.shutdown = true

sleep 2

puts "#{Thread.list.length} live threads"
 
A

Andrew S. Townley

Hi Eric,

Sorry for the late reply--haven't been able to keep up with the list
traffic.

I hadn't considered any of the Rinda stuff because what I'm trying to
model is queues. The solution that I have in place with the
TimedReadQueue and the ThreadPool (slightly modified) works.

Not knowing much about the Rinda package, I assume the Renewer critter
is some sort of implicit interface used by Rinda to determine if it
should timeout or close the stream? Overall, I guess it just proves
that there's always more than one solution to a given problem. :)

Thanks for all your help with this. I'm sure I'll have more questions
as I get further into Ruby.

Cheers,

ast

Even better with renewer objects.

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************
 
E

Eric Hodel

Not knowing much about the Rinda package, I assume the Renewer critter
is some sort of implicit interface used by Rinda to determine if it
should timeout or close the stream?

Its more low-level than that, it can be used to expire the Tuple it
is attached to.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,997
Messages
2,570,241
Members
46,832
Latest member
UtaHetrick

Latest Threads

Top