C
castillo.bryan
I wanted to create a thread pool. I know I could have used a
SizedQueue in the thread pool, but I wanted to later on change the
thread queue to a priority queue.
With ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin] I get a
deadlock error. ruby 1.8.5 (2006-08-25) [i386-mswin32] is not giving
me that error. I think that my code is written correctly and that
this may be a bug in the version/build of ruby I am running on. Does
anybody see a problem with this code? Thanks.
require 'thread'
class ThreadPool
def initialize(thread_size=10, queue_size=100)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = []
@max_queue_size = queue_size
@threads = []
thread_size.times { @threads << Thread.new { start_worker } }
end
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
def push_task(task)
puts "#{Thread.current}|push_task|sync on @mutex"
@mutex.synchronize do
while @max_queue_size > 0 && @queue.size >= @max_queue_size do
puts "#{Thread.current}|push_task|wait on @mutex"
@cv.wait(@mutex)
end
@queue.push(task)
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|push_task|done with sync on @mutex"
task
end
def pop_task
task = nil
puts "#{Thread.current}|pop_task|sync on @mutex"
@mutex.synchronize do
while @queue.size == 0 do
puts "#{Thread.current}|pop_task|wait on @mutex"
@cv.wait(@mutex)
end
task = @queue.shift
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|pop_task|done with sync on @mutex"
task
end
def start_worker
puts "#{Thread.current} running worker"
while true
task = pop_task
return if task == :stop
task.execute
end
end
class Task
attr_reader :result, :exception
def initialize(*args, &callback)
@args = args
@callback = callback
@done = false
@result = nil
@exception = nil
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def execute
begin
@result = @callback.call(*@args)
rescue Exception => e
@exception = e
STDERR.puts "Error in thread #{Thread.current} - #{e}"
e.backtrace.each { |element| STDERR.puts(element) }
end
puts "#{Thread.current}|execute|sync on @mutex"
@mutex.synchronize do
@done = true
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|execute|done with sync on @mutex"
end
def join
puts "#{Thread.current}|join|sync on @mutex"
@mutex.synchronize do
while !@done
puts "#{Thread.current}|join|wait on @mutex"
@cv.wait(@mutex)
end
end
puts "#{Thread.current}|join|done with sync on @mutex"
end
end
end
tasks = []
tp = ThreadPool.new(3, 10)
sleep(1)
100.times do |id|
STDERR.puts "adding work"
tasks << tp.add_work do
puts "Running #{id} #{Thread.current}"
sleep 2
puts "Ending #{id} #{Thread.current}"
Time.now
end
end
puts "Waiting for tasks to complete"
tasks.each do |task|
task.join
if !task.exception.nil?
puts "Failed task - #{task.exception}"
else
puts "Result - #{task.result}"
end
end
output:
--------------------------
$ ruby -v
ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin]
[/users/bcastill/tpool]
$ ruby thread_pool.rb
#<Thread:0x1002b63c> running worker
#<Thread:0x1002b63c>|pop_task|sync on @mutex
#<Thread:0x1002b63c>|pop_task|wait on @mutex
#<Thread:0x1002b4fc> running worker
#<Thread:0x1002b4fc>|pop_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0> running worker
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1003c964>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1003c964>|push_task|done with sync on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0>|pop_task|done with sync on @mutex
Running 0 #<Thread:0x1002b3d0>
Ending 0 #<Thread:0x1002b3d0>
#<Thread:0x1002b3d0>|execute|sync on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|done with sync on @mutex
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
deadlock 0x1002b4fc: sleep:- - thread_pool.rb:39
deadlock 0x1002b63c: sleep:- - thread_pool.rb:39
deadlock 0x1003c964: sleep:- (main) - thread_pool.rb:20
deadlock 0x1002b3d0: sleep:- - thread_pool.rb:39
thread_pool.rb:39:in `push_task': Thread(0x1002b3d0): deadlock (fatal)
from thread_pool.rb:15:in `add_work'
from thread_pool.rb:109
from thread_pool.rb:107:in `times'
from thread_pool.rb:107
SizedQueue in the thread pool, but I wanted to later on change the
thread queue to a priority queue.
With ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin] I get a
deadlock error. ruby 1.8.5 (2006-08-25) [i386-mswin32] is not giving
me that error. I think that my code is written correctly and that
this may be a bug in the version/build of ruby I am running on. Does
anybody see a problem with this code? Thanks.
require 'thread'
class ThreadPool
def initialize(thread_size=10, queue_size=100)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = []
@max_queue_size = queue_size
@threads = []
thread_size.times { @threads << Thread.new { start_worker } }
end
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
def push_task(task)
puts "#{Thread.current}|push_task|sync on @mutex"
@mutex.synchronize do
while @max_queue_size > 0 && @queue.size >= @max_queue_size do
puts "#{Thread.current}|push_task|wait on @mutex"
@cv.wait(@mutex)
end
@queue.push(task)
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|push_task|done with sync on @mutex"
task
end
def pop_task
task = nil
puts "#{Thread.current}|pop_task|sync on @mutex"
@mutex.synchronize do
while @queue.size == 0 do
puts "#{Thread.current}|pop_task|wait on @mutex"
@cv.wait(@mutex)
end
task = @queue.shift
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|pop_task|done with sync on @mutex"
task
end
def start_worker
puts "#{Thread.current} running worker"
while true
task = pop_task
return if task == :stop
task.execute
end
end
class Task
attr_reader :result, :exception
def initialize(*args, &callback)
@args = args
@callback = callback
@done = false
@result = nil
@exception = nil
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def execute
begin
@result = @callback.call(*@args)
rescue Exception => e
@exception = e
STDERR.puts "Error in thread #{Thread.current} - #{e}"
e.backtrace.each { |element| STDERR.puts(element) }
end
puts "#{Thread.current}|execute|sync on @mutex"
@mutex.synchronize do
@done = true
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|execute|done with sync on @mutex"
end
def join
puts "#{Thread.current}|join|sync on @mutex"
@mutex.synchronize do
while !@done
puts "#{Thread.current}|join|wait on @mutex"
@cv.wait(@mutex)
end
end
puts "#{Thread.current}|join|done with sync on @mutex"
end
end
end
tasks = []
tp = ThreadPool.new(3, 10)
sleep(1)
100.times do |id|
STDERR.puts "adding work"
tasks << tp.add_work do
puts "Running #{id} #{Thread.current}"
sleep 2
puts "Ending #{id} #{Thread.current}"
Time.now
end
end
puts "Waiting for tasks to complete"
tasks.each do |task|
task.join
if !task.exception.nil?
puts "Failed task - #{task.exception}"
else
puts "Result - #{task.result}"
end
end
output:
--------------------------
$ ruby -v
ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin]
[/users/bcastill/tpool]
$ ruby thread_pool.rb
#<Thread:0x1002b63c> running worker
#<Thread:0x1002b63c>|pop_task|sync on @mutex
#<Thread:0x1002b63c>|pop_task|wait on @mutex
#<Thread:0x1002b4fc> running worker
#<Thread:0x1002b4fc>|pop_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0> running worker
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1003c964>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1003c964>|push_task|done with sync on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0>|pop_task|done with sync on @mutex
Running 0 #<Thread:0x1002b3d0>
Ending 0 #<Thread:0x1002b3d0>
#<Thread:0x1002b3d0>|execute|sync on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|done with sync on @mutex
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
deadlock 0x1002b4fc: sleep:- - thread_pool.rb:39
deadlock 0x1002b63c: sleep:- - thread_pool.rb:39
deadlock 0x1003c964: sleep:- (main) - thread_pool.rb:20
deadlock 0x1002b3d0: sleep:- - thread_pool.rb:39
thread_pool.rb:39:in `push_task': Thread(0x1002b3d0): deadlock (fatal)
from thread_pool.rb:15:in `add_work'
from thread_pool.rb:109
from thread_pool.rb:107:in `times'
from thread_pool.rb:107