multiprocessing deadlock

B

Brian Quinlan

My test reduction:

import multiprocessing
import queue

def _process_worker(q):
while True:
try:
something = q.get(block=True, timeout=0.1)
except queue.Empty:
return
else:
print('Grabbed item from queue:', something)


def _make_some_processes(q):
processes = []
for _ in range(10):
p = multiprocessing.Process(target=_process_worker, args=(q,))
p.start()
processes.append(p)
return processes

def _do(i):
print('Run:', i)
q = multiprocessing.Queue()
for j in range(30):
q.put(i*30+j)
processes = _make_some_processes(q)

while not q.empty():
pass

# The deadlock only occurs on Mac OS X and only when these lines
# are commented out:
# for p in processes:
# p.join()

for i in range(100):
_do(i)

--------------

Output (on Mac OS X using the svn version of py3k):
% ~/bin/python3.2 moprocessmoproblems.py
Run: 0
Grabbed item from queue: 0
Grabbed item from queue: 1
Grabbed item from queue: 2
....
Grabbed item from queue: 29
Run: 1

At this point the script produces no additional output. If I uncomment
the lines above then the script produces the expected output. I don't
see any docs that would explain this problem and I don't know what the
rule would be e.g. you just join every process that uses a queue before
the queue is garbage collected.

Any ideas why this is happening?

Cheers,
Brian
 
P

paulC

My test reduction:

import multiprocessing
import queue

def _process_worker(q):
     while True:
         try:
             something = q.get(block=True, timeout=0.1)
         except queue.Empty:
             return
         else:
             print('Grabbed item from queue:', something)

def _make_some_processes(q):
     processes = []
     for _ in range(10):
         p = multiprocessing.Process(target=_process_worker, args=(q,))
         p.start()
         processes.append(p)
     return processes

def _do(i):
     print('Run:', i)
     q = multiprocessing.Queue()
     for j in range(30):
         q.put(i*30+j)
     processes = _make_some_processes(q)

     while not q.empty():
         pass

#    The deadlock only occurs on Mac OS X and only when these lines
#    are commented out:
#    for p in processes:
#        p.join()

for i in range(100):
     _do(i)

--------------

Output (on Mac OS X using the svn version of py3k):
% ~/bin/python3.2 moprocessmoproblems.py
Run: 0
Grabbed item from queue: 0
Grabbed item from queue: 1
Grabbed item from queue: 2
...
Grabbed item from queue: 29
Run: 1

At this point the script produces no additional output. If I uncomment
the lines above then the script produces the expected output. I don't
see any docs that would explain this problem and I don't know what the
rule would be e.g. you just join every process that uses a queue before
  the queue is garbage collected.

Any ideas why this is happening?

Cheers,
Brian

I can't promise a definitive answer but looking at the doc.s:-

isAlive()
Return whether the thread is alive.

Roughly, a thread is alive from the moment the start() method
returns until its run() method terminates. The module function
enumerate() returns a list of all alive threads.

I guess that the word 'roughly' indicates that returning from the start
() call does not mean that all the threads have actually started, and
so calling join is illegal. Try calling isAlive on all the threads
before returning from _make_some_processes.

Regards, Paul C.
 
B

Brian Quinlan

My test reduction:

import multiprocessing
import queue

def _process_worker(q):
while True:
try:
something = q.get(block=True, timeout=0.1)
except queue.Empty:
return
else:
print('Grabbed item from queue:', something)

def _make_some_processes(q):
processes = []
for _ in range(10):
p = multiprocessing.Process(target=_process_worker,
args=(q,))
p.start()
processes.append(p)
return processes

def _do(i):
print('Run:', i)
q = multiprocessing.Queue()
for j in range(30):
q.put(i*30+j)
processes = _make_some_processes(q)

while not q.empty():
pass

# The deadlock only occurs on Mac OS X and only when these lines
# are commented out:
# for p in processes:
# p.join()

for i in range(100):
_do(i)

--------------

Output (on Mac OS X using the svn version of py3k):
% ~/bin/python3.2 moprocessmoproblems.py
Run: 0
Grabbed item from queue: 0
Grabbed item from queue: 1
Grabbed item from queue: 2
...
Grabbed item from queue: 29
Run: 1

At this point the script produces no additional output. If I
uncomment
the lines above then the script produces the expected output. I don't
see any docs that would explain this problem and I don't know what
the
rule would be e.g. you just join every process that uses a queue
before
the queue is garbage collected.

Any ideas why this is happening?

Cheers,
Brian

I can't promise a definitive answer but looking at the doc.s:-

isAlive()
Return whether the thread is alive.

Roughly, a thread is alive from the moment the start() method
returns until its run() method terminates. The module function
enumerate() returns a list of all alive threads.

I guess that the word 'roughly' indicates that returning from the
start
() call does not mean that all the threads have actually started, and
so calling join is illegal. Try calling isAlive on all the threads
before returning from _make_some_processes.

Regards, Paul C.

Hey Paul,

I guess I was unclear in my explanation - the deadlock only happens
when I *don't* call join.

Cheers,
Brian
 
P

paulC

Hey Paul,

I guess I was unclear in my explanation - the deadlock only happens  
when I *don't* call join.

Cheers,
Brian

Whoops, my bad.

Have you tried replacing prints with writing a another output Queue?
I'm wondering if sys.stdout has a problem.

Regards, Paul C.
 
B

Brian Quinlan

Whoops, my bad.

Have you tried replacing prints with writing a another output Queue?
I'm wondering if sys.stdout has a problem.

Removing the print from the subprocess doesn't prevent the deadlock.

Cheers,
Brian
 
L

larudwer

Brian Quinlan said:
Any ideas why this is happening?

Cheers,
Brian

IMHO your code is buggy. You run in an typical race condition.

consider following part in your code:
def _make_some_processes(q):
processes = []
for _ in range(10):
p = multiprocessing.Process(target=_process_worker, args=(q,))
p.start()
processes.append(p)
return processes


p.start() may start an process right now, in 5 seconds or an week later,
depending on how the scheduler of your OS works.

Since all your processes are working on the same queue it is -- very --
likely that the first process got started, processed all the input and
finished, while all the others haven't even got started. Though your first
process exits, and your main process also exits, because the queue is empty
now ;).
while not q.empty():
pass

If you where using p.join() your main process wourd terminate when the last
process terminates !
That's an different exit condition!

When the main process terminates all the garbage collection fun happens. I
hope you don't wonder that your Queue and the underlaying pipe got closed
and collected!

Well now that all the work has been done, your OS may remember that someone
sometimes in the past told him to start an process.
def _process_worker(q):
while True:
try:
something = q.get(block=True, timeout=0.1)
except queue.Empty:
return
else:
print('Grabbed item from queue:', something)

The line

something = q.get(block=True, timeout=0.1)

should cause some kind of runtime error because q is already collected at
that time.
Depending on your luck and the OS this bug may be handled or not. Obviously
you are not lucky on OSX ;)

That's what i think happens.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,955
Messages
2,570,117
Members
46,705
Latest member
v_darius

Latest Threads

Top