[SWING] Join thread with SwingWorker objects

H

Hole

Hi guys!

May you give me any suggestion for the following?

I have a method (issued by an actionPerformed) that instantiate a
SchedulerTask object (a SwingWorker object) and call execute() method
on it.
This execute() method instantiate N SingleTasks and call the execute()
method on them.

Now, I'd like the SchedulerTask to finish its job and return
(executing the overriden done() method inherited by SwingWorker class)
only when all N SingleTasks have completed their work. Currently, I
have that SchedulerTask finishes its work after launched the N
SingleTask(s) without waiting for their completetion.

I've tried to insert a while(finishedProcesses<N) do nothing cycle but
the GUI had dramatical issues in responsiveness and performances.

Have you ever dealt with this kind of stuff in Swing?

Below, some code (it's not consistent...only to give an idea of what I
mean..hope that it'll be useful):

private void startBatchBtnActionPerformed(java.awt.event.ActionEvent
evt) {
SchedulerTask scheduler = new SchedulerTask(batchItemsList);
scheduler.execute();
}

////////////////////////////////////////////////////////////////////////////////

class SchedulerTask extends SwingWorker<Integer, Void> implements
ActionListener {
private List<BatchExtractionItem> items;

private Timer timer;
int lastIndex;
int activeProcesses;


private final static int MAX_ACTIVE_PROCESSES = 5;

private List<BatchExtractionItem> currentProcesses = new
ArrayList<BatchExtractionItem>(MAX_ACTIVE_PROCESSES);

public SchedulerTask(List<BatchExtractionItem> items) {
this.items = items;

this.timer = new Timer(1000, this);
this.timer.start();

}

public void processCompleted(BatchExtractionItem completedItem) {
//System.out.println("Completed "+completedItem);

activeProcesses--;

}


@Override
protected Integer doInBackground() throws Exception {
lastIndex = 0;
activeProcesses = 0;
while (lastIndex<items.size()) {
while(lastIndex<items.size() &&
activeProcesses<MAX_ACTIVE_PROCESSES) {
currentProcesses.add(this.items.get(lastIndex));
lastIndex++;
activeProcesses++;
}

for (BatchExtractionItem item: currentProcesses) {
if (!item.getStatus().equals
(BatchExtractionItem.COMPLETED) && !item.getStatus().equals
(BatchExtractionItem.PROCESSING)) {
item.setStatus(BatchExtractionItem.PROCESSING);
BatchTask task = new BatchTask(item);
task.execute();
}
}
}
while (activeProcesses>0) {
//do nothing...waiting...
}
return 0;
}

@Override
protected void done() {
//popup to inform the user that batch extraction is finished
//System.out.println("completed batch");

this.timer.stop();
form.getProgressBar().setValue(100);

}

@Override
public void actionPerformed(ActionEvent e) {
//issued by Timer event

int completed = countCompleted();
int progress = completed/items.size();
setProgress(progress);
form.getProgressBar().setValue(progress);
}

private int countCompleted() {
int c=0;
for (BatchExtractionItem i: items) {
if (i.getStatus().equals(BatchExtractionItem.COMPLETED)) {
c++;
}
}
return c;
}

}

///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
class BatchTask extends SwingWorker<List<SampledValue>, Void> {
BatchExtractionItem item;

SchedulerTask scheduler;
Timer timer;



public BatchTask(BatchExtractionItem item, Batcher batcher,
SchedulerTask scheduler) {
this.item = item;
this.batcher = batcher;
this.scheduler = scheduler;

}

@Override
protected List<SampledValue> doInBackground() throws Exception {
String guid = UUID.randomUUID().toString();
List<SampledValue> results = this.batcher.batchItemExtract
(item, guid);
return results;
}

@Override
protected void done() {
//save results
scheduler.processCompleted(this.item);
List<SampledValue> resultList;
try {
resultList = get();
//print result List somewhere


}
catch (InterruptedException ignore) {
} catch (ExecutionException e) {
String why = null;
Throwable cause = e.getCause();
if (cause != null) {
why = cause.getMessage();
} else {
why = e.getMessage();
}
System.err.println("Error: " + why);
}
}
 
M

markspace

Hole said:
Now, I'd like the SchedulerTask to finish its job and return
(executing the overriden done() method inherited by SwingWorker class)
only when all N SingleTasks have completed their work. Currently, I
have that SchedulerTask finishes its work after launched the N
SingleTask(s) without waiting for their completetion.


1. You should wait. Use a countdown latch.

I've tried to insert a while(finishedProcesses<N) do nothing cycle but
the GUI had dramatical issues in responsiveness and performances.


2. I think you're doing your while loop in the "done" method. Don't do
that. Wait in doInBackground.

Have you ever dealt with this kind of stuff in Swing?


I'll try to cook up an example for you later, but those two points
should get you pointed in the right direction. I didn't read the code
you posted carefully, but after a glance a couple of things jumped out
at me.

class SchedulerTask extends SwingWorker<Integer, Void> implements
ActionListener {


"implements ActionListener" here strikes me as wrong. You want your
action listener and your worker to be two seperate objects.
SwingWorkers can't be reused, so there's no point to keeping one around.

class BatchTask extends SwingWorker<List<SampledValue>, Void> {


You only need one SwingWorker to synchronize. These secondary tasks
would be better as Runnables and handed off to an Executor.



Final thoughts: Java Concurrency in Practice is an excellent book, you
really need it for this sort of work.

<http://www.javaconcurrencyinpractice.com/>
 
D

Douwe

Where should I start :/. First of all the field
SchedulerTask.activeProcesses is accessed from different Threads so
you should make it volatile. The next piece of code is causing your
processor (or at least 1 core of it) to go to 100 %

while (activeProcesses>0) {
//do nothing...waiting...
}

you could insert a wait in there like

final Object lock = new Object(); // you can move this more to the
top of the class as a class field.
while (activeProcesses>0) {
synchronized(lock) { try { lock.wait(100l); } catch(ignore)
{} } // puts the current thread into the wait state for 100 ms (you
could also do Thread.sleep instead) but I preffer the Object.wait()
method.
}


Next is the main loop

while (lastIndex<items.size()) {
while(lastIndex<items.size() &&
activeProcesses<MAX_ACTIVE_PROCESSES) {
currentProcesses.add(this.items.get(lastIndex));
lastIndex++;
activeProcesses++;
}

for (BatchExtractionItem item: currentProcesses) {
if (!item.getStatus().equals
(BatchExtractionItem.COMPLETED) && !item.getStatus().equals
(BatchExtractionItem.PROCESSING)) {
item.setStatus(BatchExtractionItem.PROCESSING);
BatchTask task = new BatchTask(item);
task.execute();
}
}
}


This part is split in two subparts: the first part adds items to the
currentProcesses as long as the activeProcesses is under a certain
amount. The second part then runs through the list of currentProcesses
and starts BatchTasks for items that have not started yet. Now imagine
what happens when activeProcesses is exactly MAX_ACTIVE_PROCESSES then
the first part will not enter the loop and the second part will enter
the loop but see that all processes are running and thus does nothing
more then checks. These two parts than get repeated over and over and
over until one of the BatchTasks finishes. This loop will therefor
also cause the CPU (or 1 core) to got to 100%. Try implementing
something like the following piece of code



private final Object lock = new Object();
private volatile int countCompleted;
pricate volatile int activeProcesses;

public void processCompleted(BatchExtractionItem completedItem) {
//System.out.println("Completed "+completedItem);

synchronized(lock) {
activeProcesses--;
countCompleted++;
lock.notifyAll(); // wake up the main thread
}
}


enum State { WAITING, RUN_NEXT };


protected Integer doInBackground() throws Exception {

List<BatchExtractionItem> queuedItems = new
ArrayList<BatchExtractionItem>();
queuedItems.addAll(items);
activeProcesses = 0;
countCompleted = 0;
boolean keepRunning = true;
BatchExtractionItem itemToRun = null;

while (keep_running) {
switch(state) {
case WAITING : {
synchronized(lock) {
if (activeProcesses<MAX_ACTIVE_PROCESSES) {
if (queuedItems.isEmpty()) {
if (activeProcesses==0) {
keep_running = false;
break;
}
} else {
state = FIND_TO_RUN;
break;
}
}
try {
lock.wait(20000l); // wait for 20 seconds max
(or a notify from processCompleted) and then check again
} catch(Exception ignore) {}
}
} break;

case RUN_NEXT : {
BatchExtractionItem item = queuedItems.removeLast(queuedItems);
if (!item.getStatus().equals(BatchExtractionItem.COMPLETED) && !
item.getStatus().equals(BatchExtractionItem.PROCESSING)) {
item.setStatus(BatchExtractionItem.PROCESSING);
BatchTask task = new BatchTask(item);
task.execute();
activeProcesses++;
} else {
// spitt out a warning
System.err.println("warn: next item was already processing or has
completed");
// countCompleted++; // add this if it should be counted as a
completed task
}
state = WAITING;
} break;
}
}
// no further checking needed ... all items have finised processing
return 0;
}



@Override
public void actionPerformed(ActionEvent e) {
//issued by Timer event
int progress = countCompleted/items.size();
setProgress(progress);
form.getProgressBar().setValue(progress);
}



Please note that I didn't test the code and I have edited the code
without an Java compatible IDE so you might need some small
adjustments.


Regards,
Douwe Vos
 
M

markspace

Here's what I came up with:

package swingthreads;

import java.awt.BorderLayout;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Main
{

public static void main( String[] args )
{
SwingUtilities.invokeLater( new Runnable()
{
public void run()
{
createGui();
}
} );
}

private static void createGui()
{
final ExampleFrame frame = new ExampleFrame();
final Executor exe = Executors.newFixedThreadPool( 5 );

ActionListener batchKicker = new ActionListener()
{

public void actionPerformed( ActionEvent e )
{
ExampleWorker worker = new ExampleWorker( exe, 5, frame.
getOutput() );
worker.execute();
}
};

frame.addButtonListener( batchKicker );
frame.setVisible( true );
}
}

class ExampleFrame
extends JFrame
{

JTextArea output;
JButton button;

public ExampleFrame()
{
super( "Example Batch Threads" );
output = new JTextArea();
JScrollPane sp = new JScrollPane( output );
add( sp );
button = new JButton( "Start" );
JPanel panel = new JPanel( new FlowLayout( FlowLayout.CENTER ) );
panel.add( button );
add( panel, BorderLayout.SOUTH );
setLocationRelativeTo( null );
setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );
setSize( 300, 300 );
}

public JTextArea getOutput()
{
return output;
}

public void addButtonListener( ActionListener a )
{
button.addActionListener( a );
}
}

class ExampleWorker
extends SwingWorker<Void, Void>
{

Executor exe;
JTextArea output;
int tasks;

public ExampleWorker( Executor exe,
int tasks, JTextArea output )
{
this.exe = exe;
this.tasks = tasks;
this.output = output;
}

@Override
protected Void doInBackground()
throws Exception
{
CountDownLatch latch = new CountDownLatch( tasks );
for( int i = 0; i < tasks; i++ ) {
ExampleBatchTask batch =
new ExampleBatchTask( output, latch );
exe.execute( batch );
}
latch.await();
return null;
}

@Override
public void done()
{
output.append( "All done!\n" );
}
}

class ExampleBatchTask
implements Runnable
{

JTextArea output;
CountDownLatch latch;

public ExampleBatchTask( JTextArea output, CountDownLatch latch )
{
this.output = output;
this.latch = latch;
}

public void run()
{
int interval = (int) (Math.random() * 4 + 1);
// append() is thread safe!
output.append( "Started: " + interval + "\n" );
for( int i = 0; i < 5; i++ ) {
try {
Thread.sleep( interval * 1000 );
} catch( InterruptedException ex ) {
break; // exit immediately if interrupted
}
output.append( "running " + interval + "\n" );
}
latch.countDown();
}
}
 
L

Lew

Hole said:
I have a method (issued by an actionPerformed) that instantiate a
SchedulerTask object (a SwingWorker object) and call execute() method
on it.
This execute() method instantiate N SingleTasks and call the execute()
method on them.

By 'SIngleTask' I assume you mean 'BatchTask'.

It struck me that you're invoking a 'SwingWorker' from the
'doinBackground()' method of a 'SwingWorker'. Don't do that. Just
spawn a thread - you're already off the EventDispatchThread (EDT), so
the purpose of 'SwingWorker' is already achieved.

Your 'doInBackground()' method can do a 'Thread#join()' on each sub-
task after having spawned all of them. That way it's guaranteed that
all sub-tasks have completed before the parent ('SwingWorker') task
returns.
 
L

Lew

Douwe said:
Where should I start :/. First of all the field

Where should I start? The example you provided is rife with
unsynchronized access to shared data.
... Try implementing
something like the following piece of code

private final Object lock = new Object();
private volatile int countCompleted;
pricate volatile int activeProcesses;

You're controlling these variables with different monitors - sometimes
'lock', sometimes the internal one due to being 'volatile'. I'm not
convinced that the relationship between these variables is reliably
readable.
public void processCompleted(BatchExtractionItem completedItem) {
        //System.out.println("Completed "+completedItem);

        synchronized(lock) {
                activeProcesses--;
                countCompleted++;
                lock.notifyAll();       // wake up the main thread
        }

}

enum State { WAITING, RUN_NEXT };

protected Integer doInBackground() throws Exception {

        List<BatchExtractionItem> queuedItems = new
ArrayList<BatchExtractionItem>();
        queuedItems.addAll(items);
        activeProcesses = 0;
        countCompleted = 0;
        boolean keepRunning = true;
        BatchExtractionItem itemToRun = null;

        while (keep_running) {
                switch(state) {

The read of 'state' is not synchronized with the write.
                        case WAITING : {
                                synchronized(lock) {
                                        if (activeProcesses<MAX_ACTIVE_PROCESSES) {
                                                if (queuedItems.isEmpty()) {
                                                        if (activeProcesses==0) {
                                                                keep_running = false;
                                                                break;
                                                        }
                                                } else {
                                                        state = FIND_TO_RUN;
                                                        break;
                                                }
                                        }
                                        try {
                                                lock.wait(20000l);              // wait for 20 seconds max
(or a notify from processCompleted) and then check again
                                        } catch(Exception ignore) {}
                                }
                         } break;

                         case RUN_NEXT : {
                                BatchExtractionItem item = queuedItems.removeLast(queuedItems);

You don't synchronize the change to 'queuedItems'.
                                if (!item..getStatus().equals(BatchExtractionItem.COMPLETED) && !

or the 'getStatus()' read.
item.getStatus().equals(BatchExtractionItem.PROCESSING)) {
                                        item.setStatus(BatchExtractionItem.PROCESSING);
                                        BatchTask task = new BatchTask(item);
                                        task.execute();
                                        activeProcesses++;
                                } else {
                                        // spitt out a warning
                                        System.err.println("warn: next item was already processing or has
completed");
                                        // countCompleted++;   // add this if it should be counted as a
completed task
                                }
                                state = WAITING;
                        } break;
                }
        }
        // no further checking needed ... all items have finised processing
        return 0;

}

@Override
public void actionPerformed(ActionEvent e) {
        //issued by Timer event
        int progress = countCompleted/items.size();
        setProgress(progress);
        form.getProgressBar().setValue(progress);

}

I'm having difficulty reasoning about the synchronization the way
you've written all this. I suspect there are subtle threading bugs
there.
 
J

John B. Matthews

Hole said:
May you give me any suggestion for the following? [...]
class SchedulerTask extends SwingWorker<Integer, Void> ... [...]
class BatchTask extends SwingWorker<List<SampledValue>, Void> ...

I can't see where you define Void, "the type used for carrying out
intermediate results by this SwingWorker's publish and process methods"
and symbolized by the generic parameter V in the API [1].

You might look at the examples in the SwingWorker API [1] and this
similar database example [2]. In the latter, the generic type V is
Double, and individual instances are published from the background
thread. Note that the List<Double> received by process() is _not_ the
same List<Double> returned by doInBackground(). The former contains
intermediate results; the latter is the background thread's working
result set.

The method doInBackground() can create other threads, but you still have
to synchronize their processing, as others have suggested.

[1]<http://java.sun.com/javase/6/docs/api/javax/swing/SwingWorker.html>
[2]<http://sites.google.com/site/drjohnbmatthews/randomdata>

[...]
 
L

Lew

John said:
I can't see where you define Void, "the type used for carrying out
intermediate results by this SwingWorker's publish and process methods"
and symbolized by the generic parameter V in the API [1].

You might look at the examples in the SwingWorker API [1] and this
similar database example [2]. In the latter, the generic type V is
Double, and individual instances are published from the background
thread. Note that the List<Double> received by process() is _not_ the
same List<Double> returned by doInBackground(). The former contains
intermediate results; the latter is the background thread's working
result set.

The method doInBackground() can create other threads, but you still have
to synchronize their processing, as others have suggested.

[1]<http://java.sun.com/javase/6/docs/api/javax/swing/SwingWorker.html>
[2]<http://sites.google.com/site/drjohnbmatthews/randomdata>
 
M

markspace

I can't see where you define Void,


As Lew mentioned, it's part of the java.lang package.

A trick is to define SwingWorker<Void,Void> and then return "null" from
doInBackground():

class ExampleWorker
extends SwingWorker<Void, Void>
{
...
@Override
protected Void doInBackground() {
...
return null;
}

I notice the OP is declaring "Integer" but always returning 0. "Void"
might actually suit him better.

See my example up thread.
 
J

John B. Matthews

I can't see where you define Void,
[/QUOTE]
At nearly the same time, Lew wrote:

As Lew mentioned, it's part of the java.lang package.

Ah, thank you both.
A trick is to define SwingWorker<Void,Void> and then return "null" from
doInBackground():

class ExampleWorker
extends SwingWorker<Void, Void>
{
...
@Override
protected Void doInBackground() {
...
return null;
}

I notice the OP is declaring "Integer" but always returning 0.

I wondered about this.
"Void" might actually suit him better.

See my example up thread.

Exemplary!
 
D

Douwe

You're controlling these variables with different monitors - sometimes
'lock', sometimes the internal one due to being 'volatile'.  I'm not
convinced that the relationship between these variables is reliably
readable.

the countCompleted and activeProcesses do not have to be synchronized
via the lock object they just need to be written immediately by the
JVM (so no caching). The countCompleted is only used to update the
progressbar so if the number would be off (like i.e. 1 to less) for a
few milliseconds then nobody is noticing it. On the activeProcesses I
agree the volatile can be removed (it is only read by the main
thread).

State state = State.WAITING; // forgot this line ...
The read of 'state' is not synchronized with the write.




You don't synchronize the change to 'queuedItems'.


or the 'getStatus()' read.





I'm having difficulty reasoning about the synchronization the way
you've written all this.  I suspect there are subtle threading bugs
there.

The variable state is only written and read by one thread so there is
no need for synchronization. About the part of the getStatus I have to
partly agree with you: the getStatus() internally calls the method
FutureTask.isDone() which than calls sync.innerIsDone() where the sync
is of type Sync (see:
http://www.google.com/codesearch/p?.../util/concurrent/FutureTask.java&q=FutureTask)
It seems to me that it is overly synced (but at the time of writing I
forgot to check. I assumed hole did the right job there)
 
H

Hole

As Lew mentioned, it's part of the java.lang package.

A trick is to define SwingWorker<Void,Void> and then return "null" from
doInBackground():

Thanks a lot for your contributes and suggestions, guys!
I have to go deep into (perhaps tonight) but they seem really useful.

I'll take a look to Java Concurrency in Practice too...I think that
concurrency is a key point in professional programming and I must have
a good reference, I guess...

Many thanks again..I'll post my working code revisited when done!
cheers!
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,992
Messages
2,570,220
Members
46,807
Latest member
ryef

Latest Threads

Top