Making one or more threads wait for another to produce a value orfail

T

Tom Anderson

The scenario:

Penelope is a widow, or at least her husband isn't around any more (she's
not sure which; long story). There are 108 suitors who would like to marry
her. She hasn't decided which one she'll marry. So, the 108 suitors are
sitting about waiting for her to decide. It's possible that instead of
deciding to marry one of them, she'll deliver some other, exceptional,
verdict (eg "turns out my husband is still alive, and will now murder you
all").

Penelope is a thread, as are her suitors. Penelope has plenty to do after
she delivers her verdict, so the verdict is not a return value - it's a
value she'll pass to a method. In code, this looks something like:

class Penelope implements Runnable {
public void run() {
try {
Verdict v = ... ;
DELIVER(v);
}
catch (Exception e) {
DELIVER_EXCEPTION(e);
}
}
}

class Suitor implements Runnable() {
public void run() {
try {
Verdict v = AWAIT();
}
catch (Exception e) {
// alas
}
}
}

There has got to be something in java.util.concurrent that she can use to
deliver her verdict. What?

In terms of synchronisation, CountdownLatch with a count of 1 does it -
the suitors await, and Penelope counts down. But it has no way to pass a
value.

A blocking queue does synchronisation and delivers a value, but it only
delivers the value once - if the suitors all queue up on take(), only the
first will get the verdict.

A Future<Verdict> looks ideal - it provides synchronisation, and a value,
and provides the same value to all requesters once it's delivered, and
also handles failure and cancellation. But i don't see an easy way to make
one for a simple value. There is FutureTask, but that seems more geared to
wrapping Callable and Runnable.

Any suggestions?

Thanks,
tom
 
M

markspace

A blocking queue does synchronisation and delivers a value, but it only
delivers the value once - if the suitors all queue up on take(), only
the first will get the verdict.

You could go around this by adding all threads to a list, and
interrupting all the ones to get the exceptional verdict. Is there an
values associated with the exceptional verdict?

Also, what happens if a new suitor appears just as, or just after, the
widow delivers her verdict? Do new or late suitors get the exceptional
verdict, or do they get queued up for some other process?

And lastly, are the verdicts immutable, in terms of being an immutable
Java object? Can I just make a couple of objects (one verdict, one
exception) and hand them out to all and sundry?


It's the end of the month, and I'm packing to move. I might not get
back to this thread for a couple of days. In the meantime, good luck.
 
T

Tom Anderson

Personally, I'd just use the Object.wait() and Object.notifyAll() methods.

That probably is good enough. There are some subtleties: one has to be
aware of the possibility of spurious wakeup, and guard the wait() with a
suitable test; one has to think a bit about ensuring a happens-before
relationship between the setting of the result or exception variables and
their reading; there may be other things so subtle i haven't thought of
them. The nice thing about classes in java.util.concurrent is that they
come with an implicit contract that Doug Lea has already thought about the
subtleties for you. I love it when he does that.
I also don't see why FutureTask<Verdict> doesn't work for you (Future<V>
is just an interface…FutureTask<V> implements that interface);

FutureTask<Verdict> demands a Callable<Verdict>, which i'm not in a
position to supply. I am currently trying an approach using a do-nothing
Callable, but i am not happy about it.
just because you have more processing to do after delivering the value,
that doesn't necessarily mean that processing has to occur in the same
thread, does it?

In my case, Penelope is actually a framework thread running an event loop.
When the verdict arrives, it will be delivered to an event listener i
supply, but there is no way to get from there to returning a value from a
Callable.

Well, i suppose the listener could post the result to a BlockingQueue,
from which it would be read by a thread in a Callable, which could then
return it to its executor to fulfil a Future. But that seems a little
baroque.
(Alternatively, you could provide a custom implementation of Future<V>
that doesn't wrap a Runnable like FutureTask<V>, letting your thread
continue even after delivering the Future<V>…but such an implementation
would be more complicated than just using the wait()/notifyAll()
pattern,

Not *that* much more complicated, i don't think. Both varieties of get map
on to calls to their corresponding variety of wait in a loop guarded by a
call to isDone, and isDone maps on to a check on whether the verdict is
delivered. cancel/isCancelled would require code over and above the
simplest wait/notify implentation, but not a lot.

The thing that bugs me is that if this is so simple, and as generally
useful as i think it is, why isn't it in the JDK already?
so probably not worth the effort unless you expected to reuse the
implementation a lot).

Perhaps. Although implementing Future might be useful for documentation
purposes, because it immediately makes it clear what many of the methods
do.
It would be helpful if you could elaborate on why neither of those
simple, straightforward approaches satisfy your goals.

I hope i've explained myself a bit better now. I think writing my own
implementation of Future, as you suggest (well, as you imply - what you
actually suggest is not writing my own implementation of Future), may be a
good idea.

tom
 
P

Paul Cager

One way to do the synchronization would be a semaphore that is initially
zero, but with a large number of permits added when Penelope calls a
setVerdict method. The getVerdict method that the suitors call would
wait to get a permit, record the verdict, and put the permit back so
there is no possibility of running out of permits.

Could you not subclass CountDownLatch and add a "verdict" property?
 
T

Tom Anderson

You could go around this by adding all threads to a list, and
interrupting all the ones to get the exceptional verdict.

Interruption doesn't quite line up with the exceptional verdict.
Interruption is being told to stop waiting before there is a verdict; the
exceptional verdict is learning that there won't be a verdict.
Is there an values associated with the exceptional verdict?

Yes. Well, not the same kind as for a normal verdict, but there could be a
reason, in the form of an exception.
Also, what happens if a new suitor appears just as, or just after, the
widow delivers her verdict? Do new or late suitors get the exceptional
verdict, or do they get queued up for some other process?

They get the same verdict as everyone else. Once the verdict is reached,
it's set in stone.
And lastly, are the verdicts immutable, in terms of being an immutable
Java object? Can I just make a couple of objects (one verdict, one
exception) and hand them out to all and sundry?

Possibly. In my particular real case, yes. I'd be happy to leave that to
the user of the mechanism.

This is where i've got to as of the end of today - hacked out and tidied,
but not actually tested:

http://urchin.earth.li/~twic/Code/Promise.java

It's a simple implementation of a Future.
It's the end of the month, and I'm packing to move. I might not get
back to this thread for a couple of days. In the meantime, good luck.

And to you!

tom
 
D

Deeyana

Not *that* much more complicated, i don't think. Both varieties of get
map on to calls to their corresponding variety of wait in a loop guarded
by a call to isDone, and isDone maps on to a check on whether the
verdict is delivered. cancel/isCancelled would require code over and
above the simplest wait/notify implentation, but not a lot.

The thing that bugs me is that if this is so simple, and as generally
useful as i think it is, why isn't it in the JDK already?

I don't know. But it is available in at least one other JVM language's
standard library: Clojure has functions called "promise" and "deliver"
for exactly this sort of scenario. Under the hood it combines a
CountDownLatch with a class instance variable to hold the result and
implements Clojure's IDeref interface. The Java equivalent would just be
some class Promise<T> with internal value and CountDownLatch and deliver
and get methods. Deliver would decrement the CountDownLatch and set the
value cell; get would see if the CountDownLatch was zero, block if it
wasn't, and return the value cell's contents if it was. It would also
throw InterruptedException and maybe have a version of get accepting a
timeout.

Something like:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Promise<T> {
T value;
CountDownLatch cdl = new CountDownLatch(1);

public void deliver (T value) {
this.value = value;
cdl.countDown();
}

public T get () throws InterruptedException {
cdl.await();
return value;
}

public T get (long timeout, TimeUnit unit)
throws InterruptedException {
if (cdl.await(timeout, unit)) {
return value;
} else {
return null;
}
}

public T get (long timeout, TimeUnit unit, T to)
throws InterruptedException {
if (cdl.await(timeout, unit)) {
return value;
} else {
return to;
}
}

public T getEx (long timeout, TimeUnit unit)
throws InterruptedException {
if (cdl.await(timeout, unit)) {
return value;
} else {
throw new InterruptedException();
}
}
}

Untested and no Javadoc but it *should* work. In particular, the docu for
CountDownLatch says that value setting should "happen-before" value
getting. The first timeout-accepting get method will return null on
timeout. The second accepts a third parameter for the object to return on
timeout. The getEx method throws an InterruptedException if it times out.

I dedicate the above code into the public domain, so that it may
expeditiously find its way into JDK 8 perhaps sometime around 2040
without legal obstacles. :)
 
P

Paul Cager

I would definitely not subclass CountDownLatch.

Yes, you're right of course. Schoolboy error.
If I were to use it, the
verdict delivery class would have a CountDownLatch reference as a field,
along with fields representing the verdict.

Interestingly we've not been told much about Penelope's character.
Maybe a publish / subscribe model would be a better fit.
 
L

Lawrence D'Oliveiro

Penelope is a widow, or at least her husband isn't around any more (she's
not sure which; long story). There are 108 suitors who would like to marry
her. She hasn't decided which one she'll marry. So, the 108 suitors are
sitting about waiting for her to decide. It's possible that instead of
deciding to marry one of them, she'll deliver some other, exceptional,
verdict (eg "turns out my husband is still alive, and will now murder you
all").

Penelope is a thread, as are her suitors. Penelope has plenty to do after
she delivers her verdict, so the verdict is not a return value - it's a
value she'll pass to a method.

procedure penelope_and_her_suitors is

subtype verdict_string is
string(1 .. 12);

protected verdict is

entry deliver(v : in verdict_string);
entry obtain(v : out verdict_string);

private
v : verdict_string;
got_v : boolean := false;
end verdict;

protected body verdict is

entry deliver(v : in verdict_string) when not got_v is
begin
verdict.v := v;
got_v := true;
end deliver;

entry obtain(v : out verdict_string) when got_v is
begin
v := verdict.v;
end obtain;

end verdict;

task penelope is
end penelope;

task type suitor is
end suitor;

task body penelope is

begin
--- think about what verdict to deliver
verdict.deliver("you all die "); -- or whatever
end penelope;

task body suitor is

the_verdict : verdict_string;

begin
verdict.obtain(the_verdict);
-- do whatever with it
end suitor;

suitors : array (1 .. 108) of suitor;

begin -- penelope_and_her_suitors
null; -- wait for all the fun to finish
end penelope_and_her_suitors;
 
J

John B. Matthews

<https://groups.google.com/forum/#!topic/comp.lang.java.programmer/PvSa1FPX6as/discussion>

In this example, synchronization hinges on the entries of the protected
object [1], verdict. The suitors queue on the obtain entry, waiting for
the verdict. When penelope delivers the verdict, the barrier got_v is
changed to allow the suitors to proceed. Ada protected types [2] are a
common way to provide synchronized access to the private data of
objects.

I'm wary of a too-literal translation; but, if I understand the memory
consistency effects of CountDownLatch correctly, penelope could
establish the verdict and invoke countDown(), knowing that any suitors
returning form await() would see the correct value.

[1]<http://www.ada-auth.org/standards/12rm/html/RM-9-4.html>
[2]<http://www.adaic.org/resources/add_content/standards/95rat/rat95html/rat95-p1-2.html#9>
[3]<http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html>
 
L

Lawrence D'Oliveiro

The concurrency stuff in particular seems a bit haphazard to me in terms
of what features it provides, but then I've found that to be true in any
other concurrency API I've used ...

How about Ada? (Considered robust and trustworthy enough to implement the
life-support system on the International Space Station.)

Or, going further back, Occam (based on Hoare’s CSP)?
 
J

John B. Matthews

Lawrence D'Oliveiro said:
How about Ada? (Considered robust and trustworthy enough to implement
the life-support system on the International Space Station.)

Ada provides excellent high-level support for concurrency, but it too
had to evolve [*]. Moreover, any particular implementation can be
defective.

In the present case, your Ada example was insightful; but without
further elucidation, it may be seen as malapropos.
Or, going further back, Occam (based on Hoare’s CSP)?

[*]<http://www.adaic.org/resources/add_content/standards/95rat/rat95html/rat95-p1-2.html#9>
 
T

Tom Anderson

Spurious wakeup, yes (why people continue to tolerate that in Java, I have no
idea…plenty of other APIs with concurrency support don't have that trouble).

Happens-before/after, should come directly from a correct use of the wait()
and notifyAll() methods.

Or volatile. Working out when you can use each is one of the challenges.
In particular, as long as you ensure that the writer doesn't get to
arrive at its "synchronized" statement in which it will call notify()
until after all the readers have arrived at their wait() statement,
you're assured all the readers will see the new value.

That is true, but that's a more stringent requirement than is needed: the
happens-before relationship is between the writer's lock release, and the
reader's lock acquisition. The reader's prospective wait must be inside a
synchronized block, so even if a reader arrives at its wait statement long
after the writer has called notify() (and unlocked) its lock acquisition
will mean the writer's actions will have happened-before that moment.

I imagine you knew that, of course, and we're talking slightly at
cross-purposes.
Alternatively, maintain a separate "do I really need to wait?" flag that
is cleared when the delivered value is set, so that readers don't bother
waiting if the value's already been updated.

Yes. Which you need anyway, to deal with the spurious wakeups!

tom
 
T

Tom Anderson

I don't know. But it is available in at least one other JVM language's
standard library: Clojure has functions called "promise" and "deliver"
for exactly this sort of scenario. Under the hood it combines a
CountDownLatch with a class instance variable to hold the result and
implements Clojure's IDeref interface. The Java equivalent would just be
some class Promise<T>

I assume you didn't see it, but shortly before you posted, i posted this:

http://urchin.earth.li/~twic/Code/Promise.java

Great minds think alike.
with internal value and CountDownLatch and deliver and get methods.
Deliver would decrement the CountDownLatch and set the value cell; get
would see if the CountDownLatch was zero, block if it wasn't, and return
the value cell's contents if it was. It would also throw
InterruptedException and maybe have a version of get accepting a
timeout.

I used wait/notify rather than a CountDownLatch, but that's more or less
exactly what my class does.

tom
 
L

Lawrence D'Oliveiro

John B. Matthews said:
Moreover, any particular implementation can be defective.

GNAT seems quite good.
In the present case, your Ada example was insightful; but without
further elucidation, it may be seen as malapropos.

What sort of elucidation?
 
J

John B. Matthews

M

markspace

The scenario:

Penelope is a widow, or at least her husband isn't around any more
(she's not sure which; long story). There are 108 suitors who would like
to marry her. She hasn't decided which one she'll marry. So, the 108
suitors are sitting about waiting for her to decide. It's possible that
instead of deciding to marry one of them, she'll deliver some other,
exceptional, verdict (eg "turns out my husband is still alive, and will
now murder you all").


Just popping in quickly. First, everyone's all about CountDownLatch.
Does this really work? It requires a fixed number of threads supplied
on creation to "fire," iirc.

Second what about other java.util.concurrent classes? Both locks (esp
ReadWriteLock) and AbstractQueuedSynchronizer look promising.

<http://download.oracle.com/javase/6...current/locks/AbstractQueuedSynchronizer.html>

Especially the second example for AQS, which is "a latch class that is
like a CountDownLatch except that it only requires a single signal to
fire. Because a latch is non-exclusive, it uses the shared acquire and
release methods."


Lastly, I must be dense, I have a simple implementation but I think it's
wrong. What is wrong with it? I'm sure I've missed something in the
original specification, but I'm drawing a blank. How is the exceptional
verdict delivered? There's also no way to set a verdict for one thread
vs setting a return value for all threads. I think your initial
specification of using just "await()" and "deliver()" might be a bit too
simple for the problem.

This is just a simple class that sends a single message (the verdict) to
all threads (i.e., it broadcasts the message). Note this is untested.


public class BroadcastSynchronizer<V, E extends Throwable> {

private volatile V verdict;
private final Object lock = new Object();

public V await() throws E, InterruptedException {
while( verdict == null ) {
synchronized( lock ) {
if( verdict == null )
lock.wait();
}
}
return verdict;
}

public void deliver( V verdict ) {
this.verdict = verdict;
synchronized( lock ) {
lock.notifyAll();
}
}
}
 
M

markspace

Try #2. I've added "deliverOne()" and "deliverAll()" to distinguish
between a message sent to a single thread, and a message sent to all
threads.

However, this doesn't seem to meet your requirement for an exception.
So I've added also "deliverException()" which is basically the same as
"deliverAll()" except with the semantic that the receiver sees an
exception thrown rather than seeing a return value.

Again this is untested. Batter running low! Back later!


package test;

public class BroadcastSynchronizer<V, E extends Throwable> {

private volatile E exception;
private volatile V verdict;
private final Object lock = new Object();

public V await() throws E, InterruptedException {
while( verdict == null && exception == null ) {
synchronized( lock ) {
if( verdict == null && exception == null )
lock.wait();
}
}
if( exception != null ) {
throw exception;
}
return verdict;
}

public void deliverAll( V verdict ) {
this.verdict = verdict;
synchronized( lock ) {
lock.notifyAll();
}
}

public void deliverOne( V verdict ) {
this.verdict = verdict;
synchronized( lock ) {
lock.notify();
}
}

public void deliverException( E exception ) {
this.exception = exception;
synchronized( lock ) {
lock.notifyAll();
}
}
}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,995
Messages
2,570,233
Members
46,820
Latest member
GilbertoA5

Latest Threads

Top