M
masher
Hi,
I am trying to implement a multiprocessing pool that assigns tasks
from a blocking queue. My situation is a pretty classic producer/
consumer conundrum, where the producer can produce much faster than
the consumers can consume. The wrinkle in the story is that the
producer produces objects that consume large amounts of memory, so I
would like the queue to block when it reaches, say, twice the number
of tasks as there are processes in the pool, so that I don't devour my
RAM. I have been able to implement this one way, but I am somewhat
displeased with the result. Code speaks louder than words, so here is
a brief example of the technique I am using, followed by an
explanation of why I am unhappy with it:
===
#!/usr/bin/env python3
import time
import random
from multiprocessing import Pool, Queue, M
def procfunc(queue):
time.sleep(random.random() * 2)
return queue.get()*2
def printrange(n):
for i in range(n):
print("generated " + str(i))
yield i
if __name__ == "__main__":
sm = Manager()
pool = Pool()
queuelen = len(pool._pool) * 2
queue = sm.Queue(queuelen)
for i in printrange(100):
queue.put(i)
pool.apply_async(procfunc, (queue,), callback=print)
pool.close()
pool.join()
===
The reason I am unhappy with this trick is that if you examine the
source code of pool.py, you will note that the class Pool already uses
an internal queue, "_taskqueue" from which the tasks are assigned to
processes in the pool.
Particularly:
def __init__(self, processes=None, initializer=None, initargs=()):
self._setup_queues()
self._taskqueue = queue.Queue()
....snip
It seems to me that if I could only subclass and do
queuelen = len(pool._pool) * 2
self._taskqueue = queue.Queue(queuelen)
later in the constructor, once the pool length has been established, I
would have a much more elegant, transparent solution to the problem.
Unfortunately, the design of the Pool class is such that actually
implementing this solution would be very hackish and inelegant. If
only, say, _setup_queues() were called after the _taskqueue
assignment, then I could override it.
My questions, then, is: Is there a more elegant/pythonic way of doing
what I am trying to do with the current Pool class?
If the verdict is no, I'll be happy to file a bug report.
I am trying to implement a multiprocessing pool that assigns tasks
from a blocking queue. My situation is a pretty classic producer/
consumer conundrum, where the producer can produce much faster than
the consumers can consume. The wrinkle in the story is that the
producer produces objects that consume large amounts of memory, so I
would like the queue to block when it reaches, say, twice the number
of tasks as there are processes in the pool, so that I don't devour my
RAM. I have been able to implement this one way, but I am somewhat
displeased with the result. Code speaks louder than words, so here is
a brief example of the technique I am using, followed by an
explanation of why I am unhappy with it:
===
#!/usr/bin/env python3
import time
import random
from multiprocessing import Pool, Queue, M
def procfunc(queue):
time.sleep(random.random() * 2)
return queue.get()*2
def printrange(n):
for i in range(n):
print("generated " + str(i))
yield i
if __name__ == "__main__":
sm = Manager()
pool = Pool()
queuelen = len(pool._pool) * 2
queue = sm.Queue(queuelen)
for i in printrange(100):
queue.put(i)
pool.apply_async(procfunc, (queue,), callback=print)
pool.close()
pool.join()
===
The reason I am unhappy with this trick is that if you examine the
source code of pool.py, you will note that the class Pool already uses
an internal queue, "_taskqueue" from which the tasks are assigned to
processes in the pool.
Particularly:
def __init__(self, processes=None, initializer=None, initargs=()):
self._setup_queues()
self._taskqueue = queue.Queue()
....snip
It seems to me that if I could only subclass and do
queuelen = len(pool._pool) * 2
self._taskqueue = queue.Queue(queuelen)
later in the constructor, once the pool length has been established, I
would have a much more elegant, transparent solution to the problem.
Unfortunately, the design of the Pool class is such that actually
implementing this solution would be very hackish and inelegant. If
only, say, _setup_queues() were called after the _taskqueue
assignment, then I could override it.
My questions, then, is: Is there a more elegant/pythonic way of doing
what I am trying to do with the current Pool class?
If the verdict is no, I'll be happy to file a bug report.