Will Future<?>.get() release the object monitor?

W

William Rose

Hello,

I'm want a thread pool controller like an ExecutorService that has the
ability to stop all active tasks and suspend the adding of new tasks,
before starting afresh and adding more tasks. The purpose is to create
an interruptible background task (comprised of parallelisable sub-tasks
performed by the thread pool attached to the ExecutorService) that can
be stopped and restarted as the project is updated.

I am developing this new class, which delegates to the ExecutorService,
because ExecutorService doesn't provide the reset-able behaviour I want,
and only supports a once-off shutdown. So my prototype code (below) is
intended to do this. But it doesn't, at least in part because calling
Future<?>.get() suspends the thread but does not release the current
object's monitor (like wait() would). This leads to a deadlock in the
current code.

So I have a few of questions:
* Is there an easier way to achieve what I described first (a
reset-able ExecutorService)?
* If not, is it possible to make the condition wait triggered by get()
also lead to the release of some kind of mutex (either the object
monitor, or some other arbitrary mutex)?

If neither of the above has a good answer, I could instead be polling
the Future<?>.isDone() method to detect completion (and using
controller.wait() and a piggy-backed call to the controller.notify()
method on the end of every task), but this seems a bit more hacked up
than had hoped.

cheers,
Will

/**
* Code follows - FutureTest.java
* Requires J2SE 5.0
*/
package com.radiologyforstudents.builder.build;

import java.text.MessageFormat;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* FutureTest
*
* @author William Rose
*/
public class FutureTest extends Thread {
private static Log s_log = LogFactory.getLog(FutureTest.class);

public static class Task implements Runnable {
private static Log s_log = LogFactory.getLog(Task.class);

private static AtomicInteger s_count = new AtomicInteger();

private Integer _id;

private Long _delay;

public Task(long delay) {
_id = s_count.incrementAndGet();
_delay = delay;
}

/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
s_log.info(MessageFormat.format("{0}:\tStarting task which will take
{1} ms", new Object[] { _id, _delay }));

process();

s_log.info(MessageFormat.format("{0}:\t Completed task", new Object[]
{ _id }));
}

protected void process() {
try {
Thread.sleep(_delay);
} catch (InterruptedException e) {
s_log.error(MessageFormat.format("{0}:\t Interrupted task", new
Object[] { _id }));
}
}
}

private ExecutorService _executor;

private List<Future<?>> _running;

private boolean _flushing;

/**
*
*/
public FutureTest() {
super("Control");

_executor = Executors.newCachedThreadPool();
_running = new LinkedList<Future<?>>();
_flushing = false;
}

public synchronized void submit(Task task) {
_running.add(_executor.submit(task));
notifyAll();
}

protected synchronized void reset() {
if (_flushing)
return;

_flushing = true;

// Try to cancel as many as possible quickly
for (Future<?> future : _running) {
if (!future.isDone())
future.cancel(false);
}

// Wait for all the futures to finish
for (Future<?> future : _running) {
try {
future.get();
} catch (CancellationException e) {
} catch (InterruptedException e) {
s_log.error(e);
} catch (ExecutionException e) {
s_log.error(e);
}
}

_running.clear();
_flushing = false;
}

/* (non-Javadoc)
* @see java.lang.Thread#run()
*/
@Override
public synchronized void run() {
while(!_executor.isShutdown()) {
if(_running.size() < 1) {
try {
wait();
} catch (InterruptedException e) {
s_log.error("Control interrupted while waiting for new future");
}
} else {
Future<?> future = _running.get(0);

try {
future.get();
} catch (InterruptedException e) {
s_log.error("Control interrupted while waiting future to arrive");
} catch (ExecutionException e) {
s_log.error("Control interrupted due future exception");
}
}
}
}


public synchronized void quit() {
_executor.shutdown();
notifyAll();
}

/**
* @param args
*/
public static void main(String[] args) {
final FutureTest controller = new FutureTest();

controller.start();

controller.submit(new FutureTest.Task(5000) {
/* (non-Javadoc)
* @see com.radiologyforstudents.builder.build.FutureTest.Task#process()
*/
@Override
protected void process() {
super.process();

controller.quit();
}
});
}

}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,995
Messages
2,570,233
Members
46,820
Latest member
GilbertoA5

Latest Threads

Top