E
Eric S. Johansson
I was working on a filter for postfix and instead of using the "fork a
new python process on every message" route, I decided to use the SMTP
interface instead and try forking after having started the Python process.
obviously, I needed some way to receive and processed SMTP. smtpd.py
works OK if you are willing to use single threaded one at a time
filtering. Because filtering takes significant amounts of time, I
decided to expand smtpd to support a forking server.
I make no claims to the correctness or suitability of these
modifications or even that I understood wtf was going on with
asyncore/chat. it will also be interesting to see what improvements
others have to suggest. But here's what I found:
it was relatively easy to create a subclass (forkSMTPServer) and inside
of the accept handler, I was able to fork off the child process and
release the connection. I suspect that I haven't released the listen
socket properly but I will look into that.
I also copied the asyncore polling loop into SMTPServer and modified it
to support forking and exit on the same signal as the rest of asyncore.
the last change I added was a close method to SMTPChannel and
process_close method SMTPServer. This is so one can detect when a
channel has closed and do something about it such as ending a child process.
so, in the end a forking smtp receiver looks like:
class filter(smtpd.forkSMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
sleep(5)
log("postfix peer %s %s"%(peer))
log("postfix mail from %s"%(mailfrom))
log("postfix rcpt to %s"%(rcpttos))
#<do something with message here>
print "processing",self.parent_ID
f = filter2(("localhost",40025))
try:
#asyncore.loop(10)
f.loop(10)
print "exit from loop"
except KeyboardInterrupt:
pass
except SystemExit:
pass
except Exception, error:
print str(error)
etype, value, tb = sys.exc_info()
exception_strings = traceback.format_exception(etype, value, tb)
for i in exception_strings:
print(i, 1)
line wrapped patches below:
270,274d269
< def close(self):
< print "outside close"
< asynchat.async_chat.close(self)
< self.__server.process_close()
<
321,385d315
< def process_close (self):
< """override this abstract method to handle the close of SMTP
channel. """
< pass
<
< def loop (self, timeout=30.0, use_poll=0, map=None):
< global f
< if map is None:
< map=asyncore.socket_map
<
< if use_poll:
< if hasattr (select, 'poll'):
< poll_fun = asyncore.poll3
< else:
< poll_fun = asyncore.poll2
< else:
< poll_fun = asyncore.poll
<
< while map:
< try:
< status = os.waitpid(-1,os.WNOHANG)
< print "timeout loop %s %s"% (self.parent_ID, str(status))
< except OSError, error:
< if error[0] == 10:
< status = (0,0)
< else:
< print self.parent_ID, error
<
< if status[0] == 0:
< try:
< poll_fun (timeout, map)
< except asyncore.ExitNow:
< return
<
<
< class forkSMTPServer(SMTPServer):
<
< def __init__(self, address):
<
< self.parent_ID = None
< SMTPServer.__init__(self, address,(0,0))
< self.mychannel = None
<
< def handle_accept(self):
< conn, addr = self.accept()
< self.parent_ID = os.fork()
< if not self.parent_ID:
< print >> DEBUGSTREAM, 'Incoming connection from %s' %
repr(addr)
< self.mychannel = SMTPChannel(self, conn, addr)
< else:
< # close off socket in parent and fake lack of connection
< conn.close()
< self.connected = 0
< print "child fork ID", self.parent_ID
<
< def process_close(self):
<
< print "process my very own close "
<
< # if we are a child process, we are done so exit
< if not self.parent_ID:
< print "exit stage left %s"% (self.parent_ID)
< raise asyncore.ExitNow
<
<
<
new python process on every message" route, I decided to use the SMTP
interface instead and try forking after having started the Python process.
obviously, I needed some way to receive and processed SMTP. smtpd.py
works OK if you are willing to use single threaded one at a time
filtering. Because filtering takes significant amounts of time, I
decided to expand smtpd to support a forking server.
I make no claims to the correctness or suitability of these
modifications or even that I understood wtf was going on with
asyncore/chat. it will also be interesting to see what improvements
others have to suggest. But here's what I found:
it was relatively easy to create a subclass (forkSMTPServer) and inside
of the accept handler, I was able to fork off the child process and
release the connection. I suspect that I haven't released the listen
socket properly but I will look into that.
I also copied the asyncore polling loop into SMTPServer and modified it
to support forking and exit on the same signal as the rest of asyncore.
the last change I added was a close method to SMTPChannel and
process_close method SMTPServer. This is so one can detect when a
channel has closed and do something about it such as ending a child process.
so, in the end a forking smtp receiver looks like:
class filter(smtpd.forkSMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
sleep(5)
log("postfix peer %s %s"%(peer))
log("postfix mail from %s"%(mailfrom))
log("postfix rcpt to %s"%(rcpttos))
#<do something with message here>
print "processing",self.parent_ID
f = filter2(("localhost",40025))
try:
#asyncore.loop(10)
f.loop(10)
print "exit from loop"
except KeyboardInterrupt:
pass
except SystemExit:
pass
except Exception, error:
print str(error)
etype, value, tb = sys.exc_info()
exception_strings = traceback.format_exception(etype, value, tb)
for i in exception_strings:
print(i, 1)
line wrapped patches below:
270,274d269
< def close(self):
< print "outside close"
< asynchat.async_chat.close(self)
< self.__server.process_close()
<
321,385d315
< def process_close (self):
< """override this abstract method to handle the close of SMTP
channel. """
< pass
<
< def loop (self, timeout=30.0, use_poll=0, map=None):
< global f
< if map is None:
< map=asyncore.socket_map
<
< if use_poll:
< if hasattr (select, 'poll'):
< poll_fun = asyncore.poll3
< else:
< poll_fun = asyncore.poll2
< else:
< poll_fun = asyncore.poll
<
< while map:
< try:
< status = os.waitpid(-1,os.WNOHANG)
< print "timeout loop %s %s"% (self.parent_ID, str(status))
< except OSError, error:
< if error[0] == 10:
< status = (0,0)
< else:
< print self.parent_ID, error
<
< if status[0] == 0:
< try:
< poll_fun (timeout, map)
< except asyncore.ExitNow:
< return
<
<
< class forkSMTPServer(SMTPServer):
<
< def __init__(self, address):
<
< self.parent_ID = None
< SMTPServer.__init__(self, address,(0,0))
< self.mychannel = None
<
< def handle_accept(self):
< conn, addr = self.accept()
< self.parent_ID = os.fork()
< if not self.parent_ID:
< print >> DEBUGSTREAM, 'Incoming connection from %s' %
repr(addr)
< self.mychannel = SMTPChannel(self, conn, addr)
< else:
< # close off socket in parent and fake lack of connection
< conn.close()
< self.connected = 0
< print "child fork ID", self.parent_ID
<
< def process_close(self):
<
< print "process my very own close "
<
< # if we are a child process, we are done so exit
< if not self.parent_ID:
< print "exit stage left %s"% (self.parent_ID)
< raise asyncore.ExitNow
<
<
<