Pattern suggestion

F

FrenKy

Hi *,
I have a huge file (~10GB) which I'm reading line by line. Each line has
to be analyzed by many number of different analyzers. The problem I have
is that to make it at least a bit performance optimized due to sometimes
time consuming processing (usually because of delays due to external
interfaces) i would need to make it heavily multithreaded.
File should be read only once to reduce IO on disks.

So I need "1 driver to many workers" pattern where workers are
multithreaded.

I have a solution now based on Observable/Observer that I use (and it
works) but I'm not sure if it is the best way.

Any suggestion would be appreciated.
Thanks in advance!
 
R

Rui Maciel

FrenKy said:
Hi *,
I have a huge file (~10GB) which I'm reading line by line. Each line has
to be analyzed by many number of different analyzers. The problem I have
is that to make it at least a bit performance optimized due to sometimes
time consuming processing (usually because of delays due to external
interfaces) i would need to make it heavily multithreaded.
File should be read only once to reduce IO on disks.

So I need "1 driver to many workers" pattern where workers are
multithreaded.

I have a solution now based on Observable/Observer that I use (and it
works) but I'm not sure if it is the best way.

Any suggestion would be appreciated.
Thanks in advance!

What about this:

Consider three types of component: the file reader, data analyzer and a data
queue.

The data queue component acts as a data buffer. This component is
responsible for managing the data queue, and supplying the data analyzer
components with the data from the file. It buffers the data contained in a
file by storing it in multiple data snippets in such a way that can be
accessed concurrently by each data analyzer components. Once a data snippet
is analyzed by all data analyzers, the data queue frees the resource.

The file reader component is responsible for supplying the data queue
component with data read from a file. It accomplishes this by pushing data
snippets on top of the data queue on request.

The data analyzer component requests data snippets from the data queue
component. Once the data is analyzed, the data analyzer component releases
the resource and requests more data from the file by signaling the data
queue for the next data snippet.

This scheme would, at least in theory, provide you with a way to have
multiple data analyzers analyze independent parts of a file without forcing
a set of data analyziers to wait for a component to finish processing a data
snippet. You can also avoid the observer pattern with this sort of
solution, and therefore end up with a component which is simpler to
understand and a bit more robust to change.


Hope this helps,
Rui Maciel
 
L

Lew

Well, we aren't either.

"Best" is a very loose term, and it encompasses multiple often
mutually-exclusive metrics. What is your metric for "best"?

How do you measure it?

Even after you answer that, we don't know. We haven't seen your code. You
might have a simply brilliant implementation of what you call
"Observable/Observer" that simply blows any other approach out of the water.
Conversely, your "Observable/Observer" label may be wildly inapplicable, and
the code utter crap, and even were that the correct pattern the execution
thereof might have barely crawled out of the crapper.

We simply don't know. You maybe have the best, worst or somewhere between of
all possible implementations. I hope it's nearer the first than the second.

Would you go to a nutritionist and say, "I'm on a high-carb diet, is that the
best way?" It might be, if the carbs are starchy like pasta and you're at the
shore running marathons. It might not be, if you're eating only Twinkies and
beer and watching your fourth straight /Jersey Shore/ marathon.

Tell you what. How about you share what you've done and then, based on
information, we provide feedback?

http://sscce.org/
 
M

markspace

So I need "1 driver to many workers" pattern where workers are
multithreaded.

I have a solution now based on Observable/Observer that I use (and it
works) but I'm not sure if it is the best way.


Map-Reduce is the current standard pattern for big searches like these,
I believe. Check Google and Wikipedia to get started. I also recommend
O'Reilly's "Hadoop" book.

There's is also Fork-Join, an older pattern that's a little more labor
intensive in terms of code, imo.
 
J

Jan Burse

FrenKy said:
Hi *,
I have a huge file (~10GB) which I'm reading line by line. Each line has
to be analyzed by many number of different analyzers. The problem I have
is that to make it at least a bit performance optimized due to sometimes
time consuming processing (usually because of delays due to external
interfaces) i would need to make it heavily multithreaded.
File should be read only once to reduce IO on disks.

So I need "1 driver to many workers" pattern where workers are
multithreaded.

I have a solution now based on Observable/Observer that I use (and it
works) but I'm not sure if it is the best way.

Any suggestion would be appreciated.
Thanks in advance!

Some penny of thoughts:

- Check whether the bottleneck is writing the
result of the analysers and not reading the file,
you might add extra workers and queses for writing
the result.

- Check whether the analysers are all equally fast,
otherwise the least performant analyser will
delay the processing, even with some queues,
since they will be limited in size.

Bye
 
A

Arved Sandstrom

I suggest taking a look at java.util.concurrent.ThreadPoolExecutor and
related classes.

Try to minimize ordering relationships between processing on the lines,
so that you can overlap work on multiple lines as much as possible.

Patricia

I agree. A problem description like this, java.util.concurrent is the
first thing that pops into my head. markspace mentioned map-reduce, and
there is specifically fork-join in Java 1.7 (they are similar insofar as
they are algorithms for dividing problems); I don't know if any of
that's involved because the line analysis may be independent. IOW, this
may not be a distributable problem, this may be millions of individual
problems.

java.util.concurrent will definitely have something. It could well be
that the processing of each line is isolated, and I'd assuredly be
thinking of ThreadPoolExecutor or something similar for managing these.
It has a lot of tuning options including queues. If the analyzers for
each line have to coordinate (and maybe there's some final processing
after all complete) there are classes for that too, like CyclicBarrier.

AHS
 
M

Martin Gregorie

I agree. A problem description like this, java.util.concurrent is the
first thing that pops into my head. markspace mentioned map-reduce, and
there is specifically fork-join in Java 1.7 (they are similar insofar as
they are algorithms for dividing problems); I don't know if any of
that's involved because the line analysis may be independent. IOW, this
may not be a distributable problem, this may be millions of individual
problems.

java.util.concurrent will definitely have something. It could well be
that the processing of each line is isolated, and I'd assuredly be
thinking of ThreadPoolExecutor or something similar for managing these.
It has a lot of tuning options including queues. If the analyzers for
each line have to coordinate (and maybe there's some final processing
after all complete) there are classes for that too, like CyclicBarrier.
Yes. Since the OP doesn't give any indication that you can decide that
the analysers needed for each line can be selected by some sort of fast,
simple inspection, about all you can do is:

foreach line l
foreach analyser a
start a thread for a(l)
wait for all threads to finish

At first glance you might thinkusing a queue per analyser would help but,
with the data volumes quoted that will soon fall apart if any analyser is
more than trivially slower than the rest. As the OP has already said that
some analysers can be much slower due to external interface delays (I
presume that means waiting for DNS queries, etc.), I think he's stuck
with the sort of logic I sketched out. After processing has gotten under
way and any analyser-specific queues have filled up, the performance of
any more complex logic will degrade to the above long before the input
has been completely read and processed.

In summary, don't try to do anything more sophisticated than the above.
 
J

Jan Burse

Jan said:
- Check whether the analysers are all equally fast,
otherwise the least performant analyser will
delay the processing, even with some queues,
since they will be limited in size.

Assume you have 2 analyzers, one slow and one
fast. And 4 cores.

Load junks of 1GB into memory, split the 1GB into
two 500MB. And let:

1st 500MB slow analyzer with core 1 affinity
2nd 500MB slow analyzer with core 2 affinity
1GB fast analyzer with core 3 affinity

Or let the 2nd slow analyer start at 5GB from the
beginning.

Bye

BTW: Last time I looked at shootout:
http://shootout.alioth.debian.org/

many of the problems now have parallel
implementations. The test maschine is 4 core.
If you do a little calculation for the Java
results, you will see that they have an average
core utilization of a factor of 2.6, similar
to the Erlang results.

Maybe you can pick up some ideas of how to
code parallel solutions from there. Otherwise
there is a nice book by Doug Lea:

Concurrent Programming in Java™: Design Principles and Pattern
http://www.amazon.com/Concurrent-Programming-Java-Principles-Pattern/dp/0201310090
 
A

Arne Vajhøj

I have a huge file (~10GB) which I'm reading line by line. Each line has
to be analyzed by many number of different analyzers. The problem I have
is that to make it at least a bit performance optimized due to sometimes
time consuming processing (usually because of delays due to external
interfaces) i would need to make it heavily multithreaded.
File should be read only once to reduce IO on disks.

So I need "1 driver to many workers" pattern where workers are
multithreaded.

I have a solution now based on Observable/Observer that I use (and it
works) but I'm not sure if it is the best way.

As I see it then you need 3 things:
* A single reader thread. That is relative simple just be sure to
read big chunks of data
* N threads doing M analysis's. There are various ways of doing this.
Manually started threads and thread pool. I think the best choice
between those will depend on the solution for the next bullet.
* A way of moving data data from the reader to M analyzers.

The first two solutions that come to my mind are:

A1) Use a single java.util.concurrent blocking queue, use
a custom thread pool, use command pattern, have
the reader put M commands on the queue containing the
same data and the analysis to perform, the N threads
read the commands from the queue and analyze as instructed.
A2) Use the standard ExecutorService thread pool, use command
pattern, have the reader submit M commands that are also tasks
to the executor containing the same data and the analysis
to perform, the N threads read the commands from the queue
and analyze as instructed.
(A1 and A2 are really the same solution just slightingly different
implementation)
B) Use non persistent message queue and JMS, use publish subscribe
pattern, have the reader publish the data to the queue, have a
multipla of M custom treads each implementing a single analysis
subscribing to the queue, reading and analyzing.

A has less overhead than B. A is more efficient than B if some
analysis's take longer time than others.

But B can be used in a clustered approach.

(I guess you could do A3 with commands on a message queue and
a thread pool on each cluster member as well)

Arne
 
A

Arne Vajhøj

Map-Reduce is the current standard pattern for big searches like these,
I believe. Check Google and Wikipedia to get started. I also recommend
O'Reilly's "Hadoop" book.

There's is also Fork-Join, an older pattern that's a little more labor
intensive in terms of code, imo.

Even though both are about parallellizing a workload, then I
think that those are mostly used for different purposes.

Hadoop is great for processing TB/PB residing on disk
on multiple computers.

Fork Join is great for processing MB/GB residing in
memory on multiple cores on the same computer.

Arne
 
R

Robert Klemme

Observer does seem weird for this. Fork / join approaches seem much
more natural.

In the past we had a few issues with TPE because it seemed to try to be
too smart about resource usage (threads can die if min and max are set
differently). I don't remember the details but it might be worth
checking the official bug database.
+1

Yes. Since the OP doesn't give any indication that you can decide that
the analysers needed for each line can be selected by some sort of fast,
simple inspection, about all you can do is:

foreach line l
foreach analyser a
start a thread for a(l)
wait for all threads to finish

Starting threads and waiting for their termination only makes sense if
all the results need to be processed together. If that's not the case
I'd rather use ThreadPoolExecutor (or something similar which is pretty
easily coded) and just throw tasks into the queue.
At first glance you might thinkusing a queue per analyser would help but,
with the data volumes quoted that will soon fall apart if any analyser is
more than trivially slower than the rest. As the OP has already said that
some analysers can be much slower due to external interface delays (I
presume that means waiting for DNS queries, etc.), I think he's stuck
with the sort of logic I sketched out. After processing has gotten under
way and any analyser-specific queues have filled up, the performance of
any more complex logic will degrade to the above long before the input
has been completely read and processed.

In summary, don't try to do anything more sophisticated than the above.

I would suggest a change depending on the answer to the question: is
there a fixed format of lines which needs to be parsed identical for
each analysis? If so I'd avoid multiple identical parse steps and write
a variant like this:

foreach line l
dat = parse(l)
foreach analyser a
start a thread for a(dat)
wait for all threads to finish

If results of analysis do not have to be aligned I'd simply do

queue = new TPE

foreach line l
dat = parse(l)
foreach analyzer a
queue.execute(a.task(dat))

queue.shutdown
while (!queue.awaitTermination(...)) {/* nop */}

Kind regards

robert
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,995
Messages
2,570,236
Members
46,825
Latest member
VernonQuy6

Latest Threads

Top