multithreading - managing transactions and sequence of processing

J

Jaco Smuts

Hello there

question: I'm looking for information / patterns / recipe's for
implementing a multi threaded program (multiple producers / consumers)
that will manage transactions and maintain sequence across the different
threads.

background:
I'm busy writing an adapter taking messages from ibm websphere mq using
pymqi and inserting these messages into a database (after some
processing, the messages come in as xml). If I comment out the database
..execute bit of the code, i process hundreds of messages per second, the
database slows the whole process down to around 60 - 100 messages per
second. I'm hoping to speed this up some by having 2 threads reading the
messages from mq, and another populating the database - at least then
the database bit will never have to wait for me to fetch another
message. (i'm thinking of later adding a second thread for the databse
as well).

The challenge is that these messages need to be processed in exact
sequence and transactionally. (ie. the thread that fetched a message
will have to wait for confirmation that the message has successfully
been inserted into the database before committing the transaction, in mq
terms the message will only really be deleted then.)

I've started implementing a kind of state machine using dictionary's to
facilitate this, but I'm now concerned that
- there is a better way
- i might get bitten with dictionary processing updating / setting of
values not necessarily being thread safe.

I've looked at this recipe
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/302997 as a
possible replacement for dictionary's but I'm not even sure if I'm using
it correctly.

thank you
jaco
 
D

Dennis Lee Bieber

Caveat: I'm not the most skilled at threading but...
second. I'm hoping to speed this up some by having 2 threads reading the

About the only /easy/ way to have two readers and still maintain
seriality on the database will require some sort of locking mechanism so
only one reader at a time can queue the data to the database thread --
and that lock also has to ensure that the reads stay in order. You've
just lost the advantage of parallel threads. Maybe using two overlapping
locks:
loop:
read.acquire()
get data
queue.acquire()
read.release()
queue data
queue.release()
messages from mq, and another populating the database - at least then
the database bit will never have to wait for me to fetch another
message. (i'm thinking of later adding a second thread for the databse
as well).
Same problem -- you demand serialness, and to get that between two
writers requires mutual locking. Goodbye to the advantage of
parallelness.
The challenge is that these messages need to be processed in exact
sequence and transactionally. (ie. the thread that fetched a message
will have to wait for confirmation that the message has successfully
been inserted into the database before committing the transaction, in mq
terms the message will only really be deleted then.)

I'm vague on the exact processing here: does the reader have to
block until the database confirmation? Or do you only mean that some
response to the message has to wait for the database commit?

If the latter, I'd probably use three threads and two queues.

reader:
loop
raw = get message #I presume they have some ID for response
message = parse(raw) #into database field list
inq.put(message)

database:
loop
sleep() #some nominal time to ensure other threads run
ms = []
for i in range(inq.size()):
ms.append(inq.get())
if ms:
database.executemany(INSERT, ms)
database.commit()
outq.put(ms) #or just the IDs needed for response

confirmer:
loop
committed = outq.get()
for m in committed:
confirm_delete(m.ID)


Assuming the tie-up is the database .execute() running on a single
message at a time, collecting batches of messages and using an
..executemany() might speed that phase up. Rather than /n/ individual
..execute()/.commit() cycles, you only have one, slightly larger
..executemany()/.commit().

With only one thread doing each phase, and data passing via queues,
seriality is maintained.

--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,994
Messages
2,570,223
Members
46,812
Latest member
GracielaWa

Latest Threads

Top