thread.rb

A

Adam Bender

I want to modify the Queue class in thread.rb, so I first searched for
the file. I found it in /usr/lib/ruby/1.8/thread.rb, both on a Mac
and a Linux machine. On the Mac, the file was as expected, but on the
Linux machine, it contained only this:
-----------------------
unless defined? Thread
fail "Thread not available for this ruby interpreter"
end

require 'thread.so'
-----------------------
Does this mean the Ruby code for Queue is compiled into a shared
library? Is it possible to edit the code? (I know I can modify the
class from outside the file, and that is probably what I will end up
doing, but I'm rather surprised by the above file contents and would
like to know more about it).

Thanks,

Adam

P.S. On a related note, I want to add a method to the Queue class that
is similar to pop, but will take a timeout value and return nil if it
no item is push'ed before timeout seconds have elapsed. Can anyone
suggest something better than the following:

class Queue
def pop_with_timeout(timeout)
elapsed = 0.0
while (Thread.critical = true; @que.empty?)
@waiting.push Thread.current
Thread.critical = false
elapsed += sleep(timeout - elapsed)
puts elapsed
return nil if elapsed >= timeout
end
@que.shift
ensure
Thread.critical = false
end
end

That was the first thing I came up with, and I'm (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?
 
R

Robert Klemme

2008/3/25 said:
I want to modify the Queue class in thread.rb, so I first searched for
Why?

P.S. On a related note, I want to add a method to the Queue class that
is similar to pop, but will take a timeout value and return nil if it
no item is push'ed before timeout seconds have elapsed. Can anyone
suggest something better than the following:

class Queue
def pop_with_timeout(timeout)
elapsed = 0.0
while (Thread.critical = true; @que.empty?)
@waiting.push Thread.current
Thread.critical = false
elapsed += sleep(timeout - elapsed)
puts elapsed
return nil if elapsed >= timeout
end
@que.shift
ensure
Thread.critical = false
end
end

That was the first thing I came up with, and I'm (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
#wait with timeout parameter.

Kind regards

robert
 
A

Adam Bender

What what? I want to modify the class as described below; I searched
for the file so I could see how pop was implemented and the class
variables it used.

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
#wait with timeout parameter.

The Queue class uses Thread.critical. Though I guess writing a queue
using a Monitor would be easy enough to do.

Adam
 
M

MenTaLguY

require 'thread.so'
-----------------------
Does this mean the Ruby code for Queue is compiled into a shared
library? Is it possible to edit the code? (I know I can modify the
class from outside the file, and that is probably what I will end up
doing, but I'm rather surprised by the above file contents and would
like to know more about it).

Yes, it's implemented in C. In JRuby, it's implemented in Java.

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait
Queue#pop
SizedQueue#pop
SizedQueue#push

Unfortunately, there is no _reliable_ way to patch timeouts in after
the fact. The support needs to be added to Ruby core.

In the interim, I think the best you can do is to implement your own
queue data structure which supports timeouts using Mutex,
ConditionVariable and the 'scheduler' gem. For instance:

require 'thread'
require 'scheduler'

class MyQueue
def initialize
@lock = Mutex.new
@values = []
@readers = []
end

class Timeout < Interrupt
end

class Reader
def initialize
@condition = ConditionVariable.new
@state = :waiting
@value = nil
end

def timeout
@state = :timeout
@condition.signal
self
end

def value=(value)
@value = value
@state = :ready
@condition.signal
self
end

def wait_for_value(lock)
@condition.wait(lock) while @state == :waiting
raise Timeout, "timed out" unless @state == :ready
@value
end
end

def push(value)
@lock.synchronize do
if @readers.empty?
@values.push value
else
@readers.pop.value = value
end
end
self
end

def pop(timeout=nil)
@lock.synchronize do
unless @values.empty?
@values.pop
else
reader = Reader.new
@readers.push reader
begin
if timeout
timeout = Scheduler.after_delay!(timeout) do
@lock.synchronize do
reader.timeout if @readers.delete reader
end
end
end
reader.wait_for_value(@lock)
finally
@readers.delete reader
timeout.cancel if timeout
end
end
end
end
end

(This is untested, but should basically do what you need and you're
welcome to use it. Feel free to ask any questions you might have.)

-mental
 
M

MenTaLguY

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
#wait with timeout parameter.

Agreed, anything using Thread.critical is not very future-proof.

Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at least)
it uses Timeout in an uncontrolled way and is not entirely reliable as a
result. To implement Monitor robustly in JRuby we ended up adding timeout
support to JRuby's ConditionVariable#wait.

*** I would strongly recommend _against_ using any concurrency primitive in
1.9 except for the primitives from thread.rb/thread.so. ***

Given that, the most directly portable way to get a robust queue that
supports timeouts is to roll your own using Mutex, ConditionVariable, and
some safe source of timed events like Scheduler.

-mental
 
M

MenTaLguY

Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at
least) it uses Timeout in an uncontrolled way and is not entirely reliable
as a result.

I am wrong. Timeouts are currently unimplemented in 1.9's Monitor.
*** I would strongly recommend _against_ using any concurrency primitive
in 1.9 except for the primitives from thread.rb/thread.so. ***

Evidently more things have changed since 1.8 than I had thought, so
this may or may not still hold.

-mental
 
A

Adam Bender

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait

ConditionVariable#wait appears to support timeouts
(http://www.ruby-doc.org/stdlib/libd...s/MonitorMixin/ConditionVariable.html#M001018).
Do they not work as expected?

I couldn't find the Scheduler gem, so I'm trying to hack something
together based your code example, but using Monitors. It depends on
ConditionVariable#wait, so it could be totally broken. I didn't
understand the point of the Reader class in your code, since only one
thread at a time can acquire the lock in pop. This problem is closely
related to the example on page 146 of Pickaxe 2nd ed.

require 'monitor'

class MyQueue

def initialize
@values = []
@values.extend(MonitorMixin)
@cond = @values.new_cond
end

def push(value)
@values.synchronize do
@values.push value
@cond.signal
end
self
end

def pop(timeout=nil)
ret = nil
@values.synchronize do
t = Time.now
@cond.wait(timeout)
puts "waited #{Time.now - t}"
ret = @values.shift unless @values.empty?
end
return ret
end
end

q = MyQueue.new

consumers = (1..3).map do |i|
Thread.new("consumer #{i}") do |name|
begin
obj = q.pop(5)
puts "#{name} consumed #{obj.inspect}"
sleep(rand(0.05))
end until obj == :END_OF_WORK
end
end

producers = (1..3).map do |i|
Thread.new("producer #{i}") do |name|
3.times do |j|
sleep(1)
q.push("Item #{j} from #{name}")
end
end
end

producers.each { |th| th.join }
consumers.size.times { q.push:)END_OF_WORK) }
consumers.each { |th| th.join }

The problem is that the wait() waits for timeout seconds, even when
something is in the queue.

Thanks for your help,

Adam
 
M

MenTaLguY

I couldn't find the Scheduler gem

gem install scheduler
I didn't understand the point of the Reader class in your code, since
only one thread at a time can acquire the lock in pop.

The lock is released (by ConditionVariable#wait) while a reader is
waiting (and re-acquired before ConditionVariable#wait returns).
Otherwise writers couldn't get in to write.

Conceptually, a Queue is actually two queues: a queue of "pushes" which
holds pushed values, and a queue of "pops" which holds threads waiting
for new values to be pushed.

If there have been more pushes than pops, the "pushes" queue will have
entries in it, and if there have been more pops than pushes, the "pops"
queue will have entries in it. Each push tries to take an entry from
the "pops" queue and vice-versa, so that when there have been an equal
number of pushes and pops, both queues will be empty.

Readers are just used to represent entries in the "pops" queue.

-mental
 
M

MenTaLguY

def push(value)
@values.synchronize do
@values.push value
@cond.signal
end
self
end

def pop(timeout=nil)
ret = nil
@values.synchronize do
t = Time.now
@cond.wait(timeout)
puts "waited #{Time.now - t}"
ret = @values.shift unless @values.empty?
end
return ret
end
The problem is that the wait() waits for timeout seconds, even when
something is in the queue.

In this case, the problem is that you are always putting the thread to
sleep on the "pops" queue, even when there is already a pushed value
available on the "pushes" queue.

You need to check whether the "pushes" queue is empty and only sleep
when it is:

def pop
@values.synchronize do
while @values.empty?
@cond.wait
end
@values.shift
end
end

Note that I've used a while loop rather than a simple if test; this is
for several reasons, but mainly because it is possible for another
thread to "steal" a value from the "pops" queue before we finish
waking up. Because of that, we have to check the predicate again after
@cond.wait returns, and potentially go back to sleep.

Implementing this in conjunction with timeout is harder, but possible:

def pop(timeout = nil)
if timeout
deadline = Time.now + timeout
else
deadline = nil
end
@values.synchronize do
while @values.empty?
if deadline
timeout = deadline - Time.now
return nil if timeout <= 0
end
@cond.wait(timeout)
end
@values.shift
end
end

Since this uses the system clock rather than a monotonic timer, it does
still have a bug inasmuch as changing the system time can cause #pop to
wait for too long or too little, but Ruby 1.8 and 1.9 have that same bug
in their thread schedulers anyway.

-mental
 
M

MenTaLguY

My apologies, I didn't read your previous replies thoroughly enough.
Anyways, I'm not worried about 1.9 portability just yet, I just need
something that works in 1.8.6.

Monitor should work then, although the implementation of timeouts is
a little flaky in 1.8. It should work reliably in JRuby though.

-mental
 
A

Adam Bender

gem install scheduler

Hmm. That didn't work the first time I tried it (the gem wasn't
found), but it worked just now.
In this case, the problem is that you are always putting the thread to
sleep on the "pops" queue, even when there is already a pushed value
available on the "pushes" queue.

I realized later on that that was probably the problem, and indeed it was.
Implementing this in conjunction with timeout is harder, but possible:

Awesome, this gives me exactly what I need. Thanks for you help.

Adam
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Similar Threads

faster Thread.exclusive 2
Mutex question / request 1
[ANN] forkoff-1.1.0 0
[ANN] fastthread 0.6.2 4
eventmachine and threads 3
JavaFX tags not wrapping around 0
[ANN] fastthread 0.4 4
Thread deadlock 8

Members online

No members online now.

Forum statistics

Threads
473,985
Messages
2,570,199
Members
46,766
Latest member
rignpype

Latest Threads

Top