Joel said:
Proposed fix, using a condition var... still needs some eyeballing and
some tests:
That was not quite right either (because cond.signal only wakes the
waiter, and doesn't schedule it). The following seems to complete
without deadlocks or starvation.
require 'thread'
require 'rbtree'
class PriorityQueue
def size
@tree.size
end
def initialize(*)
super
@tree = MultiRBTree.new
@mutex = Mutex.new
@cond = ConditionVariable.new
end
# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
@tree.store(pri, obj)
@cond.signal
end
end
def pop(non_block=false)
@mutex.synchronize do
if (
[email protected])
return @tree.delete(last[0]) # highest key, oldest first
end
if non_block
raise ThreadError, "priority queue empty"
end
loop do
@cond.wait(@mutex)
if (
[email protected])
return @tree.delete(last[0])
end
end
end
end
end
if __FILE__ == $0
Thread.abort_on_exception = true
pq = PriorityQueue.new
n_items_per_thread = 1000
n_writers = 10
n_readers = 10
writers = (0...n_writers).map do |i_thr|
Thread.new do
n_items_per_thread.times do |i|
pri = rand(10)
pq.push([pri, i, i_thr], pri)
Thread.pass if rand(5) == 0
end
end
end
sleep 0.1 until pq.size > 100 # a little head start populating the tree
results = Array.new(n_readers, 0)
readers = (0...n_readers).map do |i|
Thread.new do
loop do
pq.pop
results
+= 1
end
end
end
writers.each do |wr|
wr.join
end
p results
until pq.size == 0
sleep 0.1
p results
end
raise unless results.inject {|s,x|s+x} == n_items_per_thread * n_writers
end