Ordering Asynchronous Events

C

cppaddict

Assume events are being generated by players in a game. They must be
processed in order: player 1, player 2, etc. Our application, on the
other hand, is receiving them at somewhat arbitrary times. If it
receives player 2's event before player 1's, it must hold off on
processing player 2's event. In addition, if it has not received
player 1's event after a specified amount of time (after having
already received player 2's event), it should throw an error.

Below is a sample program which generates test events and processes
them. It works, but I'd like advice on how to improve its design. I
thought that maybe Doug Lea's concurrent package had some classes that
might be appropriate here.

It seems like a lot of code, but it's pretty simple. Also, then Event
and EventGenerator classes are just for creating test events, so there
not really part of the critique I'm looking for.

Thanks in advance,
cpp

PS: If you save the code below in a single file called
"EventProcessor.java", it will compile and run.

-----CODE FOLLOWS-----
import java.util.LinkedList;
import java.util.ListIterator;

class ThreadPauser {
int mPlayer;
public ThreadPauser (int i) {
mPlayer = i;
}
public synchronized void stop() {
System.out.println(mPlayer + " waiting...");
try { wait(10); }
catch (InterruptedException ignore) {}
System.out.println(mPlayer + " waking...");
}
public synchronized void start() {
notifyAll();
}
}

public class EventProcessor {

private LinkedList mPlayersToProcess;
private LinkedList mReceivedEvents;
private ThreadPauser[] mThreadPausers;
private long mTimeBenchmark;

public EventProcessor() {
mTimeBenchmark = System.currentTimeMillis();
mPlayersToProcess = new LinkedList();
mReceivedEvents = new LinkedList();
mThreadPausers = new ThreadPauser[10];
for (int i=0; i<10; i++) {
mPlayersToProcess.add(new Integer(i));
mThreadPausers = new ThreadPauser(i);
}
}

public void processEvent(Event e) {
mReceivedEvents.add(e);
int firstInLine = ( (Integer)
mPlayersToProcess.getFirst() ).intValue();
if (e.mPlayer == firstInLine) {
System.out.println("Event for position " +
e.mPlayer + " at time " + e.mTime + " -- time now = " +
(System.currentTimeMillis() - mTimeBenchmark));
mPlayersToProcess.removeFirst();
int nextInLine = (firstInLine + 1) % 10;
mThreadPausers[nextInLine].start();
}
else {
mThreadPausers[e.mPlayer].stop();
processEvent(e,true);
}
}

private void processEvent(Event e, boolean recursive) {
int firstInLine = ( (Integer)
mPlayersToProcess.getFirst() ).intValue();
if (e.mPlayer == firstInLine)
processEvent(e);
else if (haveReceivedPrecedingEvents(e.mPlayer)) {
mThreadPausers[e.mPlayer].stop();
processEvent(e,true);
}
else {
System.out.println("error!");
}
}

private boolean haveReceivedPrecedingEvents(int
playerPosition) {
ListIterator neededPlayersIter =
mPlayersToProcess.listIterator(0);
while (neededPlayersIter.hasNext()) {
int testPlayer = ( (Integer)
neededPlayersIter.next() ).intValue();
ListIterator receivedEventsIter =
mReceivedEvents.listIterator(0);
boolean gotNeededPosition = false;
while (receivedEventsIter.hasNext()) {
int thisPlayer = ((Event)
receivedEventsIter.next()).mPlayer;
if (thisPlayer == testPlayer) {
gotNeededPosition = true;
break;
}
}
if (!gotNeededPosition)
return false;
}
return true;
}

public static void main(String[] args) {
EventProcessor myProcessor = new EventProcessor();

Thread threads[] = new Thread[10];
//create the threads
for (int i=0; i<10; i++)
threads = new Thread(new
EventGenerator(myProcessor,i));
//start the threads
for (int i=0; i<10; i++)
threads.start();

}


}

/***********************************
* Event and Event Generator
* ***********************************/
class Event {
//these are public just for convenience, since this is a test
public int mPlayer;
public int mTime;

public Event(int player, int time) {
mPlayer = player;
mTime = time;
}
}

class EventGenerator implements Runnable {

private int mPosition;
private EventProcessor mEventProcessor;

public EventGenerator(EventProcessor ep, int pos) {
mEventProcessor = ep;
mPosition = pos;
}

public void run() {
int sleepTime = (int) (10*Math.random()) ;
try { Thread.currentThread().sleep( sleepTime ); }
catch (InterruptedException ignore) {}
mEventProcessor.processEvent(new
Event(mPosition,sleepTime));
}

}
 
J

John C. Bollinger

cppaddict said:
Assume events are being generated by players in a game. They must be
processed in order: player 1, player 2, etc. Our application, on the
other hand, is receiving them at somewhat arbitrary times. If it
receives player 2's event before player 1's, it must hold off on
processing player 2's event. In addition, if it has not received
player 1's event after a specified amount of time (after having
already received player 2's event), it should throw an error.

Below is a sample program which generates test events and processes
them. It works, but I'd like advice on how to improve its design. I
thought that maybe Doug Lea's concurrent package had some classes that
might be appropriate here.

Perhaps it is an artifact of your test setup, but the design is weird.
It would be typical to have a thread for processing events that is
separate from the threads generating events, but instead you have the
event generating threads each running the processing code. That will be
a difficult model to implement correctly, even when you switch to live
event feeds (and maybe more so then). It has a potential to exhibit
performance problems, and I would expect it to be prone to concurrency bugs.

It seems to me that you have described a situation where there needs to
be an event queue for each player, each fed by a thread (not necessarily
unique to that queue), and an event processor running in its own thread
which processes each queue in cyclic sequence. The Java Tutorial's
section on multi-threaded programming contains a producer/consumer
example that should give you a good start on setting up the queues for
correct synchronization between event producers and the event consumer.

In a setup like that, the only time you need to force a thread to
suspend is when the processing thread is waiting for an event that has
yet to arrive, and then only the processing thread needs to wait. The
event feeds do not execute any of the same code as the event processor,
and the only shared objects are the queues and the events. If you make
the events immutable then they pose no concurrency problem and you only
need to worry about the event queues themselves as far as thread safety.


John Bollinger
(e-mail address removed)
 
P

Paul Lutus

cppaddict said:
Assume events are being generated by players in a game. They must be
processed in order: player 1, player 2, etc. Our application, on the
other hand, is receiving them at somewhat arbitrary times. If it
receives player 2's event before player 1's, it must hold off on
processing player 2's event. In addition, if it has not received
player 1's event after a specified amount of time (after having
already received player 2's event), it should throw an error.

The solution is simple. Create a state machine with a state register that is
visible to all the threads. A thread cannot execute unless the state
register allows it to, and after the thread has executed, it increments the
state register.

Each thread tests the register and, if it is not time for that thread, it
idles, then tests again.

I must also ask -- have you considered not using threads? Forcing sequential
execution on threads calls into question the use of threads.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,994
Messages
2,570,223
Members
46,812
Latest member
GracielaWa

Latest Threads

Top