R
Raymond Hettinger
I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads
Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.
If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.
So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.
There's are working implementation at:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/475160
Here is an example:
def worker():
while 1:
task = q.get()
do_work(task)
q.task_done()
# setup queue and launch worker threads
q = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
# load the tasks
for elem in tasklist:
q.put(elem)
# block until they are done
q.join()
# process with other stuff
. . .
There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down, 2) complicating the
producer logic to append one sentinel for each consumer when the data
stream is done, 3) actually knowing when the data stream is done. The
latter is a problem in one of my programs because the consumer
sometimes uses a divide-and-conquer approach, resulting in two new
subtasks being loaded to the queue. IOW, the only way to know when all
the inputs have been handled is to have the queue empty and all
consumers inactive (i.e. processing complete).
def worker():
while 1:
task = q.get()
if task is None: # check for sentinel
return
do_work(task)
# setup queue and launch worker threads
q = Queue()
threads = []
for i in range(num_worker_threads):
t = Thread(target=worker)
threads.append(t)
t.start()
# load the tasks
for elem in tasklist:
q.put(elem)
for i in range(num_worker_threads):
q.put(None) # load sentinels
# block until they are done
for t in threads:
t.join()
# process with other stuff
. . .
Another approach is to set-up a second queue for results. Each
consumer loads a result when it is done with processing an input. The
producer or main thread then becomes responsible for matching all
inputs to the corresponding results. This isn't complicated
in-practice but it isn't pretty either:
def worker():
while 1:
task = tasks_in.get()
do_work(task)
tasks_out.put(None) # enqueue a None result
when done with task
# setup queues and launch worker threads
tasks_in = Queue()
tasks_out = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
# load the tasks
n = len(tasklist)
for elem in tasklist:
tasks_in.put(elem)
# block until they are done
for i in range(n):
tasks_out.get()
# process with other stuff
. . .
This approach becomes messier if the task loading occurs at multiple
points inside a more complex producer function. Also, it doesn't work
well with the worker thread divide-and-conquer situation discussed
above.
of queues with daemon consumer threads
Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.
If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.
So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.
There's are working implementation at:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/475160
Here is an example:
def worker():
while 1:
task = q.get()
do_work(task)
q.task_done()
# setup queue and launch worker threads
q = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
# load the tasks
for elem in tasklist:
q.put(elem)
# block until they are done
q.join()
# process with other stuff
. . .
There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down, 2) complicating the
producer logic to append one sentinel for each consumer when the data
stream is done, 3) actually knowing when the data stream is done. The
latter is a problem in one of my programs because the consumer
sometimes uses a divide-and-conquer approach, resulting in two new
subtasks being loaded to the queue. IOW, the only way to know when all
the inputs have been handled is to have the queue empty and all
consumers inactive (i.e. processing complete).
def worker():
while 1:
task = q.get()
if task is None: # check for sentinel
return
do_work(task)
# setup queue and launch worker threads
q = Queue()
threads = []
for i in range(num_worker_threads):
t = Thread(target=worker)
threads.append(t)
t.start()
# load the tasks
for elem in tasklist:
q.put(elem)
for i in range(num_worker_threads):
q.put(None) # load sentinels
# block until they are done
for t in threads:
t.join()
# process with other stuff
. . .
Another approach is to set-up a second queue for results. Each
consumer loads a result when it is done with processing an input. The
producer or main thread then becomes responsible for matching all
inputs to the corresponding results. This isn't complicated
in-practice but it isn't pretty either:
def worker():
while 1:
task = tasks_in.get()
do_work(task)
tasks_out.put(None) # enqueue a None result
when done with task
# setup queues and launch worker threads
tasks_in = Queue()
tasks_out = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
# load the tasks
n = len(tasklist)
for elem in tasklist:
tasks_in.put(elem)
# block until they are done
for i in range(n):
tasks_out.get()
# process with other stuff
. . .
This approach becomes messier if the task loading occurs at multiple
points inside a more complex producer function. Also, it doesn't work
well with the worker thread divide-and-conquer situation discussed
above.