Deepak said:
Hi
Pls I need a help on above problem.*
Thanks
Deepak
Here's my approach to a similar problem. Still not as polished as I'd
like, but it maybe useful.
The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).
=begin
NAME
class CircularBuffer
DESCRIPTION
A lightweight but (hopefully) thread-safe version of the circular
buffer
Designed primarily for intentionally limited in-memory event/error
logging.
URI
INSTALL
HISTORY
0.1
SYNOPSIS
cb = CircularBuffer.new(50) # create a new CircularBuffer that
holds 50 nil elements
cb << 'fnord' # append an element to the buffer
elements = cb.to_a # return elements as an array with
elements ordered from oldest to newest
cb.clear # force all entires to nil
CAVEATS
The CircularBuffer ignores nil elements and ignores attempts to append
them
2DOs
By Djief
=end
require 'thread'
class CircularBuffer
def initialize(max_size)
@max_size = max_size
@ra = Array.new(@max_size, nil)
@head = 0
@mutex = Mutex.new
end
private
def inc(index)
(index +1) % @max_size
end
public
# set all elements to nil
#
def clear
@mutex.synchronize do
@ra.collect! { |element| element = nil }
end
end
# append a new element to the current 'end'
#
def <<(element)
unless element.nil?
@mutex.synchronize do
@ra[@head]=element
@head = inc(@head)
end
end
end
# return the entire buffer (except nil elements)
# as an array
#
def to_a
index = @head
result = []
@mutex.synchronize do
@max_size.times do
result << @ra[index] unless @ra[index].nil?
index = inc(index)
end
end
result
end
end
=begin
NAME
class PoolQM
DESCRIPTION
PoolQM extends an Array with MonitorMixin to create a queue with
an associated pool of worker threads that wait process any requests
that are added to the queue.
A dispatcher thread watches continuously for enqueued requests and
signals idle worker threads (if any) to dequeue and process the
request(s). If no idle workers exist, the request remains in the
queue until one is available.
During the creation of a new instance of PoolQM, the number of worker
threads is established and the request processing block is defined:
results = Queue.new
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end
Note that any output you expect to collect from your worker threads
should
be returned via some thread-safe mechanism or container (Queue is a
good
default).
Enqueuing a request is all that is necessary to initiate it's
processing:
pqm.enq("This is a test, this is only a test")
If a request causes an exception within the processing block, the
Exception
is appended to a circular buffer whose contents can be obtained as an
array
with the PoolQM#exceptions method.
If you're intested in logging exceptions, you'll have a bit more work
to
do but replacing the CircularBuffer with a Queue that has it's own
worker
to handle disk IO is probably a good bet.
Performance-wise this approach behaves more consistently than any I've
produced so far i.e. it's both fast and performs with repeatable
uniformity.
No doubt, there's still room for improvement.
URI
INSTALL
HISTORY
0.1 - genesis
0.2 - documentation and clean-up
SYNOPSIS
require 'thread'
results = Queue.new # thread-safe container
for results! <<<<<<<<<< IMPORTANT
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end
100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end
pqm.wait_until_idle # wait for all requests
to be processed
until results.empty? do # dump results
p results.pop
end
pqm.exceptions.each do |exception| # obtain exceptions
array and dump it
p exception
end
CAVEATS
2DOs
By Djief
=end
require 'monitor'
class PoolQM
# default size for the exceptions CircularBuffer
#
DEFAULT_EXCEPTION_BUFFER_SIZE = 10
# Create a new PoolQM with 'worker_count' worker threads to execute
# the associated block
#
def initialize(worker_count = 1)
raise 'block required: { |request| ... }' unless block_given?
@worker_count = worker_count
@request_q = []
@request_q.extend(MonitorMixin)
@request_ready = @request_q.new_cond
@exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
@worker_count.times do
Thread.new do
loop do
request = nil
@request_q.synchronize do
@request_ready.wait
request = @request_q.delete_at(0)
end
begin
yield request
rescue Exception => e
@exceptions << e
end
Thread.pass
end
end
end
@dispatcher = Thread.new do
loop do
@request_q.synchronize do
@request_ready.signal unless @request_q.empty? ||
@request_ready.count_waiters == 0
end
Thread.pass
end
end
end
# enq the request data
#
def enq(request)
@request_q.synchronize do
@request_q << request
end
end
# Wait until all the queued requests have been removed
# from the request_q && then wait until all threads have
# compeleted their processing and are idle
#
def wait_until_idle(wait_resolution=0.3)
q_empty = false
until q_empty
@request_q.synchronize do
q_empty = @request_q.empty?
end
sleep(wait_resolution) unless q_empty
end
all_threads_idle = false
until all_threads_idle
@request_q.synchronize do
all_threads_idle = @request_ready.count_waiters == @worker_count
end
sleep(wait_resolution) unless all_threads_idle
end
end
# create a new exceptions buffer of new_size
#
def exceptions_buffer_size=(new_size)
@exceptions = CircularBuffer.new(new_size)
end
# report the size of the current exceptions buffer
#
def exceptions_buffer_size
@exceptions.size
end
# return the current exceptions buffer as an ordered Array
#
def exceptions
@exceptions.to_a
end
end
if __FILE__ == $0
# the usual trivial example
require 'thread'
# >>>> thread-safe container for result <<<<
#
results = Queue.new
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
raise "Dummy Exception during #{request}" if rand(10) == 0 #
simulate random exceptions
results << "Current request: #{request}" # processing request
here
end
100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end
# wait for all requests to be processed
pqm.wait_until_idle
# dump results
until results.empty? do
p results.pop
end
# obtain exceptions array and dump it
pqm.exceptions.each do |exception|
p exception
end
end
Regards,
djief