Threading.Condition problem

G

Gabriel Rossetti

Sorry if this appears twice, I sent it once with an attachment and it
never arrived so maybe the attachment is posing problems. I inlined the
code this time (at the bottom), thank you,

Gabriel

########################## Original message ############################

Hello everyone,

I wrote a small example that listens for xmpp msgs in a thread. The main
program calls a function that blocks (using Condition.wait) until a msg
has been received and then returns the msg. When a msg arrives, it is
put in a variable in the thread's object, it then calls the notify()
attr on the Condition object. For some reason, this doesn't work, the
thread gets the msg, tries to notify the Condition object, fails because
the lock has not been acquired yet and blocks. I tried ignoring the
failure, thinking that since it has not been acquired yet then when it
is, it will get the msg right away and never call Condition.wait, thus
not causing any problems, but this does not work either. Does someone
know what I am doing wrong? I attached the code to this msg.

Thank you,
Gabriel



############################ Example code ############################

from __future__ import with_statement
import xmpp, sys
from threading import Thread, Condition, Event


class Listener(Thread):
def __init__(self, ws):
Thread.__init__(self)
self.interrupt = Event()
self.message = None
self._cv = ws._cv
self.client = ws._client
self.client.RegisterHandler('message', self.onMessage)

def onMessage(self, conn, msg):
self.message = msg
try:
self._cv.notify()
except RuntimeError:
print "self._cv has not acquired the lock yet"

def getMsg(self):
return self.message

def run(self):
try:
while(not self.interrupt.isSet()):
self.client.Process(1)
except KeyboardInterrupt:
return 0

class WS(object):
def __init__(self, username, password, res):
self._jid = xmpp.protocol.JID(username)
self._client = xmpp.Client(self._jid.getDomain())
self._cv = Condition()

if(self._client.connect(server=("localhost", 5222)) == ""):
raise Exception("Error while connecting!")

if(self._client.auth(self._jid.getNode(), password, res) is None):
raise Exception("Authentication failed!")

self._client.sendInitPresence()

self._listener = Listener(self)
self._listener.start()

def getMsg(self, mid=None):
"""
"""
with self._cv:
res = self._listener.getMsg()
while not res:
self._cv.wait()
res = self._listener.getMsg()
return res

if(__name__ == "__main__"):
ws = WS("test@localhost", "123", "test")
res = ws.getMsg()
print "I just received : %s" % str(res)
sys.exit(0)
 
P

Piet van Oostrum

Gabriel Rossetti said:
GR> Sorry if this appears twice, I sent it once with an attachment and it never
GR> arrived so maybe the attachment is posing problems. I inlined the code this
GR> time (at the bottom), thank you,
GR> Gabriel
GR> ########################## Original message ############################
GR> Hello everyone,
GR> I wrote a small example that listens for xmpp msgs in a thread. The main
GR> program calls a function that blocks (using Condition.wait) until a msg
GR> has been received and then returns the msg. When a msg arrives, it is
GR> put in a variable in the thread's object, it then calls the notify()
GR> attr on the Condition object. For some reason, this doesn't work, the
GR> thread gets the msg, tries to notify the Condition object, fails because
GR> the lock has not been acquired yet and blocks. I tried ignoring the
GR> failure, thinking that since it has not been acquired yet then when it
GR> is, it will get the msg right away and never call Condition.wait, thus
GR> not causing any problems, but this does not work either. Does someone
GR> know what I am doing wrong? I attached the code to this msg.

The code that puts the message in the variable should also acquire the
lock:


def onMessage(self, conn, msg):
with self._cv:
self.message = msg
self._cv.notify()

A couple of remarks:

1. I think the code is neater if all manipulation with the condition is
done in the same class (actually in the same instance -- making this
instance into a monitor).

class Listener(Thread):
def __init__(self, ws):
Thread.__init__(self)
self.interrupt = Event()
self.message = None
self._cv = Condition()
self.client = ws._client
self.client.RegisterHandler('message', self.onMessage)

def onMessage(self, conn, msg):
with self._cv:
self.message = msg
try:
self._cv.notify()
except RuntimeError:
print "self._cv has not acquired the lock yet"

def getMsg(self):
with self._cv:
while !self.message
self._cv.wait()
return self.message

class WS(object):
def __init__(self, username, password, res):
self._jid = xmpp.protocol.JID(username)
self._client = xmpp.Client(self._jid.getDomain())
# self._cv = Condition()

def getMsg(self, mid=None):
"""
"""
return self._listener.getMsg()

Of course I haven't tested this code as I don't have the context
modules.

2. I don't know if more than one message can be delivered in the same
instance. If yes, than your code will not work, and neither will the
code above as, the message instance variable is never cleared. So the
next getMsg will be happy to deliver the previous one.
You would have to clear it when returning this one.

def getMsg(self):
with self._cv:
while !self.message
self._cv.wait()
msg = self.message
self.message = None
return msg

3. If the messages come in faster than they can be processed some will
be lost as they will overwrite the previous one in the self.message
variable. The solution is to use a threading.Queue to transfer the
messages from one thread to the other. This also saves you the hassle
of doing your own synchronisation like above. If you are not familiar
with synchronising multithreaded applications it is very easy to make
errors and even if you are it is quite easy to do them wrong. I have
been involved in distributed programming courses at university level
and I have seen many errors in this area.
 
G

Gabriel Rossetti

Piet said:
The code that puts the message in the variable should also acquire the
lock:


def onMessage(self, conn, msg):
with self._cv:
self.message = msg
self._cv.notify()

Thank you, that was the problem, I eventually found that
A couple of remarks:

1. I think the code is neater if all manipulation with the condition is
done in the same class (actually in the same instance -- making this
instance into a monitor).

The reason I didn't do that is that I don' t want the Listener to sleep,
I maybe over simplified the example, I actually put them in a dictionary
as they come in, so in your example, if I have several threads waiting
on msgs it wouldn't work. I'm trying to make a webservice api thay will
also be turned into a java .jar for people that need java. Now that I
think about it, each session will have an instance of the object so msgs
shouldn' t get mixed up (one connection per user), so I could block in
the thread. I'll try your suggestion as I think it is cleaner.
class Listener(Thread):
def __init__(self, ws):
Thread.__init__(self)
self.interrupt = Event()
self.message = None
self._cv = Condition()
self.client = ws._client
self.client.RegisterHandler('message', self.onMessage)

def onMessage(self, conn, msg):
with self._cv:
self.message = msg
try:
self._cv.notify()
except RuntimeError:
print "self._cv has not acquired the lock yet"

def getMsg(self):
with self._cv:
while !self.message
self._cv.wait()
return self.message

class WS(object):
def __init__(self, username, password, res):
self._jid = xmpp.protocol.JID(username)
self._client = xmpp.Client(self._jid.getDomain())
# self._cv = Condition()

def getMsg(self, mid=None):
"""
"""
return self._listener.getMsg()

Of course I haven't tested this code as I don't have the context
modules.

2. I don't know if more than one message can be delivered in the same
instance. If yes, than your code will not work, and neither will the
code above as, the message instance variable is never cleared. So the
next getMsg will be happy to deliver the previous one.
You would have to clear it when returning this one.
Like I said above, in reality I have a dict not just a simple variable.
def getMsg(self):
with self._cv:
while !self.message
self._cv.wait()
msg = self.message
self.message = None
return msg

3. If the messages come in faster than they can be processed some will
be lost as they will overwrite the previous one in the self.message
variable. The solution is to use a threading.Queue to transfer the
messages from one thread to the other. This also saves you the hassle
of doing your own synchronisation like above. If you are not familiar
with synchronising multithreaded applications it is very easy to make
errors and even if you are it is quite easy to do them wrong. I have
been involved in distributed programming courses at university level
and I have seen many errors in this area.
I used a dict because the API can also be setup to be async and use
callbacks, so I had to be able to
access the msgs directly and quickly.

Gabriel
 
P

Piet van Oostrum

Gabriel Rossetti said:
GR> Piet van Oostrum wrote: ....
GR> I wrote a small example that listens for xmpp msgs in a thread. The main
GR> program calls a function that blocks (using Condition.wait) until a msg
GR> has been received and then returns the msg. When a msg arrives, it is
GR> put in a variable in the thread's object, it then calls the notify()
GR> attr on the Condition object. For some reason, this doesn't work, the
GR> thread gets the msg, tries to notify the Condition object, fails because
GR> the lock has not been acquired yet and blocks. I tried ignoring the
GR> failure, thinking that since it has not been acquired yet then when it
GR> is, it will get the msg right away and never call Condition.wait, thus
GR> not causing any problems, but this does not work either. Does someone
GR> know what I am doing wrong? I attached the code to this msg.
GR> Thank you, that was the problem, I eventually found that
GR> The reason I didn't do that is that I don' t want the Listener to sleep, I
GR> maybe over simplified the example, I actually put them in a dictionary as
GR> they come in, so in your example, if I have several threads waiting on msgs
GR> it wouldn't work. I'm trying to make a webservice api thay will also be
GR> turned into a java .jar for people that need java. Now that I think about
GR> it, each session will have an instance of the object so msgs shouldn' t get
GR> mixed up (one connection per user), so I could block in the thread. I'll
GR> try your suggestion as I think it is cleaner.

Sleeping as you call it is better than busy waiting. You must have some
synchronisation to make it efficient.
If you put the messages in a dictionary access to the dictionary must be
protected. Having several threads waiting for the messages doesn't
prevent you from using proper synchronisation. Maybe you must use
notify_all instead of notify.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,965
Messages
2,570,148
Members
46,710
Latest member
FredricRen

Latest Threads

Top