Producer-consumer threading problem

G

George Sakkis

I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated at http://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting. More
specifically, I want a generator (or iterator class) with the
following generic signature:

def iter_consumed(items, produce, consume):
'''Return an iterator over the consumed items.

:param items: An iterable of objects to be `produce`()d and
`consume`()d.

:param produce: A callable `f(item)` that produces a single item;
the return
value is ignored. What "produce" exactly means is application-
specific.

:param consume: A callable `f()` that consumes a previously
produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce`
is called
`N` times, then the next `N` calls to `consume` will
(eventually, though
not necessarily immediatelly) return, i.e they will not block
indefinitely.
'''

One straightforward approach would be to serialize the problem: first
produce all `N` items and then call consume() exactly N times.
Although this makes the solution trivial, there are at least two
shortcomings. First, the client may have to wait too long for the
first item to arrive. Second, each call to produce() typically
requires resources for the produced task, so the maximum resource
requirement can be arbitrarily high as `N` increases. Therefore
produce() and consume() should run concurrently, with the invariant
that the calls to consume are no more than the calls to produce. Also,
after `N` calls to produce and consume, neither should be left
waiting.

I pasted my current solution at http://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.

George
 
C

Carl Banks

I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

The generic producer-consumer situation with unlimited buffer capacity
is illustrated athttp://docs.python.org/lib/condition-objects.html.
That approach assumes that the producer will keep producing items
indefinitely, otherwise the consumer ends up waiting forever. The
extension to the problem I am considering requires the consumer to be
notified not only when there is a new produced item, but also when
there is not going to be a new item so that it stops waiting.


Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.


Carl Banks
 
G

giltay

Sounds like a sentinel would work for this. The producer puts a
specific object (say, None) in the queue and the consumer checks for
this object and stops consuming when it sees it. But that seems so
obvious I suspect there's something else up.

There's a decent implementation of this in the Python Cookbook,
Second Edition (9.4: Working with a Thread Pool), available from
Safari as a preview:
http://my.safaribooksonline.com/0596007973/pythoncook2-CHP-9-SECT-4

Basically, there's a request_work function that adds (command,
data) pairs to the input Queue. The command 'stop' is used to
terminate each worker thread (there's the sentinel).
stop_and_free_thread_pool() just puts N ('stop', None) pairs and
join()s each thread.

The threadpool put()s the consumed items in an output Queue; they
can be retrieved concurrently using get(). You don't call join()
until you want to stop producing; you can get() at any time.

Geoff Gilmour-Taylor

(I ended up using this recipe in my own code, but with a completely
different stopping mechanism---I'm using the worker threads to control
subprocesses; I want to terminate the subprocesses but keep the worker
threads running---and a callback rather than an output queue.)
 
C

Carl Banks

I pasted my current solution athttp://codepad.org/FXF2SWmg. Any
feedback, especially if it has to do with proving or disproving its
correctness, will be appreciated.


It seems like you're reinventing the wheel. The Queue class does all
this, and it's been thorougly battle-tested.

So first of all, can you tell us why the following wouldn't work? It
might help us understand the issue you're facing (never mind the
produce and consume arguments for now--I'll cover that below).


def iter_consumed(items):
q = Queue.Queue()
sentinel = object()
def produce_all()
for item in items:
q.put()
q.put(sentinel)
producer = threading.Thread(target=produce_all)
producer.start()
try:
while True:
item = q.get()
if item is sentinel:
return
yield item
finally:
# for robustness, notify producer to wrap things up
# left as exercise
producer.join()


If you want to customize the effect of getting and putting, you can
subclass Queue and override the _get and _put methods (however, last
time I checked, the Queue class expects _put to always add an item to
the internal sequence representing the queue--not necessarily to the
top--and _get to always remove an item--not necessarily from the
bottom).

However, even that's only necessary if you want to get items in a
different order than you put them. If you just want to filter items
as they're produced or consumed, you should simply define
produce_filter and consume_filter, that are called before q.put and
after q.get, respectively.


One issue from your function. This line:

done_remaining[1] += 1

is not atomic, but your code depends on it being so. It can get out
of sync if there is a intervening thread switch between the read and
set. This was discussed on the list a while back. I posted an atomic
counter object in that thread (it was written in C--no other way) for
which the += is atomic. Otherwise you have to use a lock.


Carl Banks
 
A

Aahz

I'd like some feedback on a solution to a variant of the producer-
consumer problem. My first few attempts turned out to deadlock
occasionally; this one seems to be deadlock-free so far but I can't
tell if it's provably correct, and if so, whether it can be
simplified.

Take a look at the threading tutorial on my web page, specifically the
threadpool spider.
 
G

George Sakkis

It seems like you're reinventing the wheel. The Queue class does all
this, and it's been thorougly battle-tested.

Synchronized queues are an extremely useful data structure in many
situations. The producer/consumer paradigm however is a more general
model and doesn't depend on any specific data structure.
So first of all, can you tell us why the following wouldn't work? It
might help us understand the issue you're facing (never mind the
produce and consume arguments for now--I'll cover that below).

def iter_consumed(items):
q = Queue.Queue()
sentinel = object()
def produce_all()
for item in items:
q.put()
q.put(sentinel)
producer = threading.Thread(target=produce_all)
producer.start()
try:
while True:
item = q.get()
if item is sentinel:
return
yield item
finally:
# for robustness, notify producer to wrap things up
# left as exercise
producer.join()

As it is, all this does is yield the original items, but slower, which
is pretty much useless. The whole idea is to transform some inputs to
some outputs. How exactly each input is mapped to an output is
irrelevant at this point; this is the power of the producer/consumer
model. Produce might mean "send an email to address X" and consume
might mean "wait for an automated email response, parse it and return
a value Y". No queue has to be involved; it *may* be involved of
course, but that's an implementation detail, it shouldn't make a
difference to iter_consumed().

If you replace "q.push"/"q.pop" with "produce"/"consume" respectively
and make the last two parameters, you're much closer to my idea.
What's left is getting rid of the sentinel, since the producer and the
consumer may have been written independently, not aware of
iter_consumed. E.g. for the email example, all producers and consumers
(there may be more than one) must agree in advance on a "sentinel
email". For a given situation that might not be a big issue, but then
again, if iter_consumed() is written once without assuming a sentinel,
it makes life easier for all future producers and consumers.
If you want to customize the effect of getting and putting, you can
subclass Queue and override the _get and _put methods (however, last
time I checked, the Queue class expects _put to always add an item to
the internal sequence representing the queue--not necessarily to the
top--and _get to always remove an item--not necessarily from the
bottom).

Assuming that you're talking about the stdlib Queue.Queue class and
not some abstract Queue interface, extending it may help for limited
customization but can only get you so far. For instance the producer
and the consumer may not live in the same address space, or even in
the same machine.
One issue from your function. This line:

done_remaining[1] += 1

is not atomic, but your code depends on it being so.

No, it doesn't; it is protected by the condition object.

George
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,995
Messages
2,570,230
Members
46,818
Latest member
Brigette36

Latest Threads

Top