We have been asked to develop and application for a client that is a
'notification" system. We would like to use python, but are struggling
to find the right starting point. Any suggestions, tips or sample code
would be appreciated.
Kamaelia [1] sounds like a good fit for what you're doing. (so does Twisted
actually, but in the following I'll describe Kamaelia - why? I'm the
project lead for Kamaelia - however be aware that twisted might also fit
your problem well
[1]
http://kamaelia.sourceforge.net/Home
Assumptions in the following sketch code:
* All clients stay connected to machine b (rather than machine B
connecting to the clients or (say) emailling clients or whatever
you're doing)
* WatchSerialPort is a simple component that wraps your code for
connecting to the serial port and waiting for event
* Each message it sends out represents a unique event, and has been
serialised suitable for sending over a network connection.
* For machine B, you've wrapped up your pre-processing of events as a
component called for sake of argument EventFilter
Application outline;
Machine A is running a "listener" application that is connected to a
another device via the serial post and waits for events. We have not
problem working with the serial port, or the waiting for the event to
happen.
When A received a specific event, it needs to send a message to machine
B that and event has occurred and that type of event.
This is clearly implementable using the following on machine A. (I'll come
back to WatchSerialPort in a minute)
pipeline(
WatchSerialPort(), # the code you described above for connecting
# to the serial port sits in here.
TCPClient("machine.b", 1500),
).run()
Machine B will take the event notification, processes some additional
information (ie. database lookups) and then notify a series of clients
that have "registered" with machine B to receive events.
This sounds like the following code:
Backplane("Events").activate()
pipeline( ### INPUT TO BACKPLANE
SingleServer(port=1500),
EventFilter(),
publishTo("Events"),
).activate()
def subscriber_protocol():
return subscribeTo("Events")
SimpleServer(subscriber_protocol, 1600).run()
Clients then connect to machine B and will receive copies of all events.
Regarding implementing WatchSerialPort. In many respects I'll assume that
you can equate your functionality for getting code to something blocking
like this:
class <... some class >
def formatOutput(self, *args):
return " ".join([ str(x) for x in args ])
def watch_serial_for_events(self)
while 1:
X = raw_input()
if self.matches_event():
print self.formatOutput("We got an event!", X)
If you wanted to turn the above into a Kamaelia component (without adding
shutdown code, etc), it'd look something like:
class WatchSerialPort(Axon.ThreadedComponent.threadedcomponent):
def formatOutput(self, *args):
return " ".join([ str(x) for x in args ])
def main(self)
while 1:
X = raw_input()
if self.matches_event():
self.send( self.formatOutput("We got an event!", X),
"outbox")
The reason for using a threadedcomponent here is because I'm assuming like
raw_input your code for checking the serial port for events is blocking.
Regarding the EventFilter, again if you're using blocking code (such as
database access), then putting that code inside a "threadedcomponent"
baseclass makes sense, if it's not blocking, then using a generator (ie put
a yield somewhere in the main loop), and using a baseclass of "component"
makes sense.
The basic outline for it would look like this - assuming some code that
involves blocking processing:
class EventFilter(threadedcomponent): # I assume something blocking
def PreProcessEvent(self, event): pass # exercise for the reader
def main(self):
while 1:
while self.dataReady("inbox"):
event = self.recv("inbox")
information = self.PreProcessEvent(event) # presumed blocking
self.send( information, "outbox")
if not self.anyReady():
self.pause()
If you want all users to receive copies of all events, that's where this
kinda stops, since despite being sketch (ie untested) code, it's pretty
much all you'd do.
If you're thinking you may have a variety of different messages which you
might want to filter, then if you demultiplex them to different ports on
the same machine, then that's relatively simple to achieve as well, but the
pipeline would need changing to a graphline and the code for EventFilter
would need to do the demultiplexing.
For example, suppose you have 3 types of events people can subscribe to:
Backplane("NormalEvents").activate()
Backplane("ImportantEvents").activate()
Backplane("CriticalEvents").activate()
The event filter to demultiplex these would presumably have a way of already
characterising messages, so lets call that method characterise. The code to
do the demux would then look like this:
class EventFilter(threadedcomponent): # I assume something blocking
Outboxes {
"normalevent" : "Normal priority events are forwarded here",
"importantevent" : Import priority events are forwarded here",
"criticalevent" : "Normal priority events are forwarded here",
"error" : "Stuff we can't classify ends up here",
}
def PreProcessEvent(self, event): pass # exercise for the reader
def characterise(self, event): pass # exercise for the reader
def main(self):
while 1:
while self.dataReady("inbox"):
event = self.recv("inbox")
information = self.PreProcessEvent(event) # presumed blocking
eventtype = self.characterise(event)
if eventtype == "NORMAL":
self.send( information, "normalevent")
elif eventtype == "NORMAL":
self.send( information, "importantevent")
elif eventtype == "NORMAL":
self.send( information, "criticalevent")
else:
self.send( "Error event" + str(information), "error" )
In order to then use this, we now want to publish these events to the 3
backplanes described above, and may wish to merely dump the error to
stdout. This isn't suitable for a pipeline so use a graphline instead
replacing the pipeline "INPUT TO BACKPLANE":
Graphline(
EVENTSIN = SingleServer(port=1500),
EVENTDEMUX = EventFilter(),
NORMALEVENT = publishTo("NormalEvents"),
IMPORTANTEVENT = publishTo("ImportantEvents"),
CRITICALEVENT = publishTo("CriticalEvents"),
ERROROUT = ConsoleEchoer(),
linkages = {
("EVENTSIN","") : ("EVENTDEMUX","inbox"),
("EVENTDEMUX","normalevent") : ("NORMALEVENT","inbox"),
("EVENTDEMUX","importantevent") : ("IMPORTANTEVENT","inbox"),
("EVENTDEMUX","criticalevent") : ("CRITICALEVENT","inbox"),
("EVENTDEMUX","error") : ("ERROROUT","inbox"),
}
).activate()
Finally you'd want 3 simple servers to handle client connections to these
backplanes:
def NormalEventsProtocolFactory(): return subscribeTo("NormalEvents")
def ImportantEventsProtocolFactory(): return subscribeTo("ImportantEvents")
def CriticalEventsProtocolFactory(): return subscribeTo("CriticalEvents")
SimpleServer(NormalEventsProtocolFactory, 1600).activate()
SimpleServer(ImportantEventsProtocolFactory, 1601).activate()
SimpleServer(CriticalEventsProtocolFactory, 1602).run()
[[ As an aside, a nice optimisation to EventFilte would be to have the
result of the self.characterise method be used directly to choose the
correct outbox. ie:
while self.dataReady("inbox"):
event = self.recv("inbox")
information = self.PreProcessEvent(event) # presumed blocking
eventtype = self.characterise(event)
self.send( information, eventtype)
yield 1
But that strikes me as more obfuscated.
Strictly speaking, I suppose it'd make sense to have this as two or 3
components - one to characterise, one to preprocess, and one to demux, but
I see little value here. ]]
We also looked at the asyncore and asynchat, but are having difficulty
gettng starting with these. If someone has a working example of a
"chat" server using these modules they could share it would be very
helpful.
The above sketch code is relatively basic - misses out details like handling
shutdown messages, and the like, but it should be clear works on a model
similar to Unix pipelines. (And yes, things do look uglier when you add in
shutdown, but then it isn't really a simple system really, no matter how
short you try to make it!)
Kamaelia's homepage:
*
http://kamaelia.sourceforge.net/Home
Tutorial for the underlying component model:
*
http://kamaelia.sourceforge.net/MiniAxon/
Cookbook of examples:
*
http://kamaelia.sourceforge.net/Cookbook.html
The collaborative whiteboard code we have is heavily event driven, but isn't
on the web (it's in the main download Bundle though). It can be looked at
view ViewCVS here:
http://kamaelia.cvs.sourceforge.net/kamaelia/Code/Python/Kamaelia/Tools/Whiteboard.py?view=markup
That forwards drawing events to other whiteboards, whilst filtering out
their own events. Each whiteboard implements the same model (and can act as
a client or server or both).
However, it's a much more complex example than:
pipeline(
WatchSerialPort(),
TCPClient("machine.b", 1500),
).run()
Backplane("Events").activate()
pipeline( ### INPUT TO BACKPLANE
SingleServer(port=1500),
EventFilter(),
publishTo("Events"),
).activate()
def subscriber_protocol():
return subscribeTo("Events")
SimpleServer(subscriber_protocol, 1600).run()
For what it's worth if you want an actual chat system:
Server:
~~~~~~~
from Kamaelia.Util.Backplane import *
from Kamaelia.Chassis.ConnectedServer import SimpleServer
Backplane("chatroom").activate()
def listenToRoom():
return subscribeTo("chatroom")
def talkToRoom():
return publishTo("chatroom")
SimpleServer(listenToRoom, 1600).activate()
SimpleServer(talkToRoom, 1601).run()
Client:
~~~~~~~
import sys
from Kamaelia.Util.PipelineComponent import pipeline
from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.Util.Console import ConsoleReader, ConsoleEchoer
pipeline(
ConsoleReader(">>> "),
TCPClient(sys.argv[1], 1601),
).activate()
pipeline(
TCPClient(sys.argv[1], 1600),
ConsoleEchoer(),
).run()
That's rather messy (in terms of UI) though - the talk from the chatroom
will overtype the text, so a nice client would be to use a pygame text
display instead (we've got something called a Ticker which would be useful
here):
Client:
~~~~~~~
import sys
from Kamaelia.Util.PipelineComponent import pipeline
from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.UI.Pygame.Ticker import Ticker
from Kamaelia.Util.Console import ConsoleReader
pipeline(
ConsoleReader(">>> "),
TCPClient(sys.argv[1], 1601),
).activate()
pipeline(
TCPClient(sys.argv[1], 1600),
Ticker(),
).run()
(Unfortunately, no, we don't have a text edit/entry box for pygame yet
Anyway, hope this was useful/interesting!
Regard,
Michael.