producer/consumer remove problem

J

Jeff

For a specific producer to distribute to several consumers , I have a simple
extension of HashSet of consumers. However, a problem sometimes occurs
while a consumer is removing himself from the HashSet.

The problem occurs when distribute() calls the .eventObserved() method of
the consumer that is trying to remove himself. The .eventObserved() method
never returns. I think that is because the consumer's thread is waiting in
ConsumerSet's remove(). The producing caller and consumer are always
different threads.

To solve the problem, I'm considering having remove() spawn a thread to do
the remove so that remove does not wait on synchronized. distribute() is
called A LOT, but remove() is called rarely.

Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
Is there a better solution, or at least more Java-like?

public class ConsumerSet extends HashSet {
public void distribute(Object message) {
synchronized( this ) {
for (Iterator i = iterator(); i.hasNext();)
((Consumer) i.next()).eventObserved(message);
}
}

public void add( Consumer consumer ) {
synchronized (this) {
super.add( consumer );
}
}

public void remove( Consumer consumer ) {
synchronized ( this ) {
super.remove( consumer );
}
}
} // ConsumerSet

// usually be an extension of Thread
public interface Consumer {
/**
* Invoked when an event is observed
*/
public void eventObserved( Object toProcess );
}
 
X

xarax

Jeff said:
For a specific producer to distribute to several consumers , I have a simple
extension of HashSet of consumers. However, a problem sometimes occurs
while a consumer is removing himself from the HashSet.

The problem occurs when distribute() calls the .eventObserved() method of
the consumer that is trying to remove himself. The .eventObserved() method
never returns. I think that is because the consumer's thread is waiting in
ConsumerSet's remove(). The producing caller and consumer are always
different threads.

That's your intent, but likely not what you implemented.
To solve the problem, I'm considering having remove() spawn a thread to do
the remove so that remove does not wait on synchronized. distribute() is
called A LOT, but remove() is called rarely.

Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
Is there a better solution, or at least more Java-like?

public class ConsumerSet extends HashSet {
public void distribute(Object message) {
synchronized( this ) {
for (Iterator i = iterator(); i.hasNext();)
((Consumer) i.next()).eventObserved(message);
}
}

public void add( Consumer consumer ) {
synchronized (this) {
super.add( consumer );
}
}

public void remove( Consumer consumer ) {
synchronized ( this ) {
super.remove( consumer );
}
}
} // ConsumerSet

// usually be an extension of Thread
public interface Consumer {
/**
* Invoked when an event is observed
*/
public void eventObserved( Object toProcess );
}

This is likely your problem. You seem to have one
thread calling eventObserved(Object) that is defined
in the Thread instance of another thread. Of course,
you should know that that will not cause eventObserved()
method to run under the other thread. The other thread
must itself call eventObserved().

Your design is likely flawed.

If you have a producer thread and multiple consumer
threads, then you need something like an event queue.
The producer thread places a node onto the queue. Other
consumer threads are waiting for a node to appear on
the queue. One of the consumer threads will pull the
node off of the queue and process it.

The Observer/Observed pattern doesn't work for multiple
threads. You need a synchronized event queue where the
consumer threads will wait() until the queue is non-empty,
and the producer thread will notifyAll() when it puts a
new node onto the queue.

If you need some mutex classes, you can download the
source at http://mindprod.com/products.html, look for
the Mutex download.

Also, J2SE 5.0 has new interfaces and classes that are
very similar to the mutex download (see above).

Hope this helps.

--
----------------------------
Jeffrey D. Smith
Farsight Systems Corporation
24 BURLINGTON DRIVE
LONGMONT, CO 80501-6906
http://www.farsight-systems.com
z/Debug debugs your Systems/C programs running on IBM z/OS for FREE!
 
J

John C. Bollinger

Jeff said:
For a specific producer to distribute to several consumers , I have a simple
extension of HashSet of consumers. However, a problem sometimes occurs
while a consumer is removing himself from the HashSet.

I don't see the point of extending HashSet here. I think you are
placing responsibilities on your extended version that more properly
belong on the producer object.
The problem occurs when distribute() calls the .eventObserved() method of
the consumer that is trying to remove himself. The .eventObserved() method
never returns. I think that is because the consumer's thread is waiting in
ConsumerSet's remove().

It is conceivable that you are getting deadlocks this way, but it would
depend on how the consumer's eventObserved() method was written. If I
interpret your code and comments rightly, then eventObserved() will be
executed by the producer's thread, whereas ConsumerSet.remove() will be
executed by the consumer's thread.
The producing caller and consumer are always
different threads.

This is causing you confusion. Do not put application logic into Thread
subclasses; use Runnables instead. This gives you a more consistent
object model in the first place, but more importantly, it tends to
reduce confusion about who can do what, when, and to whom.
To solve the problem, I'm considering having remove() spawn a thread to do
the remove so that remove does not wait on synchronized. distribute() is
called A LOT, but remove() is called rarely.

No, don't. Throwing more threads at a synchronization problem just
makes for a messier synchronization problem.
Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
Is there a better solution, or at least more Java-like?

public class ConsumerSet extends HashSet {
public void distribute(Object message) {
synchronized( this ) {
for (Iterator i = iterator(); i.hasNext();)
((Consumer) i.next()).eventObserved(message);
}
}

The distribute() method belongs on your producer object, not on the set.
The set should not be exposed directly to consumer objects; instead
they should register themselves with the producer, which will add them
to the Set. This obviates any need for type-safe add() and remove()
methods, so this whole class becomes superfluous.
public void add( Consumer consumer ) {
synchronized (this) {
super.add( consumer );
}
}

public void remove( Consumer consumer ) {
synchronized ( this ) {
super.remove( consumer );
}
}
} // ConsumerSet

// usually be an extension of Thread

Should NOT be an extension of Thread. Probably should not even be an
implementation of Runnable. May be an object shared between the
producer thread and some other thread.
public interface Consumer {
/**
* Invoked when an event is observed
*/
public void eventObserved( Object toProcess );
}

Your producer should look something like this:

public class Producer implements Runnable {

private Set consumerSet = new HashSet();

public void registerConsumer(Consumer c) {
synchronized (consumerSet) {
consumerSet.add(c);
}
}

public void unregisterConsumer(Consumer c) {
synchronized (consumerSet) {
consumerSet.remove(c);
}
}

protected void fireEvent(Object event) {
synchronized (consumerSet) {
for (Iterator it = consumerSet.iterator(); it.next(); ) {
((Consumer) it.next()).eventObserved(event);
}
}
}

public void run() {
// do stuff that ends up invoking fireEvent() periodically
}
}

(Rather resembles your ConsumerSet, doesn't it?)

Encapsulating the consumer set in this way prevents any unexpected
synchronization on it that might cause deadlock. Your documentation for
the Consumer interface should remark that its eventObserved() method
must execute quickly and that the scope of its execution must not
include any attempt to unregister the consumer (which would not
deadlock, but might throw a ConcurrentModificationException).

You should also keep in mind that registering and unregistering event
listeners (err... consumers) can block on completion of fireEvent(), and
therefore no thread should invoke registerConsumer() or
unregisterConsumer() while holding the monitor for an object that the
relevant Consumer's eventObserved() method needs to lock. [That is very
likely what is causing your deadlock now.] In particular, it may not be
possible to prevent a Consumer from observing events after a request to
unregister it has been dispatched (but before the unregisterConsumer()
returns).


John Bollinger
(e-mail address removed)
 
J

Jeff

Thanks for the thoughtful response. I need to clarify the problem.

ALL consumers must consume the object. It's really a network multicast
requirement in software. The queue implementation fails to achieve this
requirement because the first consumer dequeues the object. The second
consumer also needs the object, but it's gone from the queue.

I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.
 
X

xarax

Jeff said:
Thanks for the thoughtful response. I need to clarify the problem.

ALL consumers must consume the object. It's really a network multicast
requirement in software. The queue implementation fails to achieve this
requirement because the first consumer dequeues the object. The second
consumer also needs the object, but it's gone from the queue.

I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.
Please don't top post.

It would seem that the consumable object must be posted
to each and every consumer queue via the distribute()
method. Then each consumer thread can process the consumable.

Of course, by now you understand that extending Thread
to add the eventObserved() method is worse than useless,
because it clouds the semantics of your design, doesn't
offer any functionality to the Thread class, and can
confuse the casual reader into thinking that somehow
the target thread is processing the method call.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,995
Messages
2,570,230
Members
46,819
Latest member
masterdaster

Latest Threads

Top