M
Michael Malone
Hi All,
I am really struggling with a really intermittent bug. I have a thread
pool set up in a producer-consumer fashion. General semantics are:
1) initialize thread_pool, with each thread waiting on a synchronized
Queue.pop call to get a Proc object to run.
2) call run_thread(&block), adding each job (block/Proc object) into a
Queue if all threads are busy
3) call ThreadPool#join which enqueues a 'nil' object for each thread,
meaning that Queue.pop will return something that will evaluate to
false, signaling the threads that there is no more work.
4) If any Exceptions accumulated when the blocks were running, process them.
Now, that much works perfectly well. No problems. It's only when I try
to make things a little more clever that things start going wrong. I
should point out that I did have this working for Ruby v1.8.7 it was
only when I tried to run it on v1.9.1 that things stopped working. To
me, this indicates that I had been behaving badly and the native threads
are less forgiving than the green threads. The clever part that I
mentioned was the fact that I added the ability to spawn a new process
and run the task inside that and have one of the threads in the pool
simply take care of the admin, like collecting the status when it's all
done and uses a pipe to Marshal any exceptions to accumulate them for
the join. And for the most part this works. But sometimes the whole
thing hangs, waiting for the last thread to complete (it is always only
ever one thread that doesn't complete when it does fail, but I haven't
found any other consistencies in the failure conditions).
Here is the code for the run_process portion. Some of you might
recognise parts of it. I have been trying to come up with a simple case
to narrow the problem down, but that doesn't seem to be an option.
Also, if I remove the Exception handling code things still fail like a
champion, so I presume it is something to do with the fork()/Thread
combination stopping a mutex somewhere from working correctly, but I
have no idea how.
# Positive values of priority_increment cause the scheduler to
# favour this subprocess less.
# The default is priority is 0 and goes to a maximum of 19.
# you need to have root priviledges to use negative increments.
# [+priority_increment+] The amount by which to lower the process's
priority
# [+block+] The block to run inside the new process
def run_process( priority_increment = 0, &block)
run do
read_end,write_end = IO.pipe
pid = fork() do
begin
read_end.close
current_priority =
Process.getpriority(Process:RIO_PROCESS, 0)
Process.setpriority(Process:RIO_PROCESS, 0,
current_priority + priority_increment)
block.call
rescue SystemExit => ignore
rescue Exception => error
Marshal.dump(error, write_end)
ensure
write_end.close
end
end
write_end.close
exception_string = read_end.read
read_end.close
pid,status = Process.waitpid2(pid)
unless exception_string.empty?
@failed << Marshal.restore(exception_string)
end
end
end
(And no, a SystemExit isn't being generated, I have checked that!)
Please help me!!! I am soooo lost!
Michael
=======================================================================
This email, including any attachments, is only for the intended
addressee. It is subject to copyright, is confidential and may be
the subject of legal or other privilege, none of which is waived or
lost by reason of this transmission.
If the receiver is not the intended addressee, please accept our
apologies, notify us by return, delete all copies and perform no
other act on the email.
Unfortunately, we cannot warrant that the email has not been
altered or corrupted during transmission.
=======================================================================
I am really struggling with a really intermittent bug. I have a thread
pool set up in a producer-consumer fashion. General semantics are:
1) initialize thread_pool, with each thread waiting on a synchronized
Queue.pop call to get a Proc object to run.
2) call run_thread(&block), adding each job (block/Proc object) into a
Queue if all threads are busy
3) call ThreadPool#join which enqueues a 'nil' object for each thread,
meaning that Queue.pop will return something that will evaluate to
false, signaling the threads that there is no more work.
4) If any Exceptions accumulated when the blocks were running, process them.
Now, that much works perfectly well. No problems. It's only when I try
to make things a little more clever that things start going wrong. I
should point out that I did have this working for Ruby v1.8.7 it was
only when I tried to run it on v1.9.1 that things stopped working. To
me, this indicates that I had been behaving badly and the native threads
are less forgiving than the green threads. The clever part that I
mentioned was the fact that I added the ability to spawn a new process
and run the task inside that and have one of the threads in the pool
simply take care of the admin, like collecting the status when it's all
done and uses a pipe to Marshal any exceptions to accumulate them for
the join. And for the most part this works. But sometimes the whole
thing hangs, waiting for the last thread to complete (it is always only
ever one thread that doesn't complete when it does fail, but I haven't
found any other consistencies in the failure conditions).
Here is the code for the run_process portion. Some of you might
recognise parts of it. I have been trying to come up with a simple case
to narrow the problem down, but that doesn't seem to be an option.
Also, if I remove the Exception handling code things still fail like a
champion, so I presume it is something to do with the fork()/Thread
combination stopping a mutex somewhere from working correctly, but I
have no idea how.
# Positive values of priority_increment cause the scheduler to
# favour this subprocess less.
# The default is priority is 0 and goes to a maximum of 19.
# you need to have root priviledges to use negative increments.
# [+priority_increment+] The amount by which to lower the process's
priority
# [+block+] The block to run inside the new process
def run_process( priority_increment = 0, &block)
run do
read_end,write_end = IO.pipe
pid = fork() do
begin
read_end.close
current_priority =
Process.getpriority(Process:RIO_PROCESS, 0)
Process.setpriority(Process:RIO_PROCESS, 0,
current_priority + priority_increment)
block.call
rescue SystemExit => ignore
rescue Exception => error
Marshal.dump(error, write_end)
ensure
write_end.close
end
end
write_end.close
exception_string = read_end.read
read_end.close
pid,status = Process.waitpid2(pid)
unless exception_string.empty?
@failed << Marshal.restore(exception_string)
end
end
end
(And no, a SystemExit isn't being generated, I have checked that!)
Please help me!!! I am soooo lost!
Michael
=======================================================================
This email, including any attachments, is only for the intended
addressee. It is subject to copyright, is confidential and may be
the subject of legal or other privilege, none of which is waived or
lost by reason of this transmission.
If the receiver is not the intended addressee, please accept our
apologies, notify us by return, delete all copies and perform no
other act on the email.
Unfortunately, we cannot warrant that the email has not been
altered or corrupted during transmission.
=======================================================================