NNTP binary attachment downloader with asyncore and generators

F

fishboy

Howdy,

I'm in middle of a personal project. Eventually it will download
multipart binary attachments and look for missing parts on other
servers. And so far I've got it to walk a newsgroup and download and
decode single part binaries.

I thought I'd post the code and see what people think. I'd appreciate
any feedback. It's my first program with generators and I'm worried
I'm making this twice and hard as it needs to be.

Thanks,
David Fisher

Oh yeah, email is fake since I'm deathly afraid of spam. Please post
replies here.

Code was working when I posted it. Just change the server name,
username, password, group info at the bottom to something less
virtual. :)


#!/usr/bin/env python2.3
#
import asyncore
import socket
import os
import uu
import email
#
class Newser(asyncore.dispatcher):

def __init__(self, host,port,user,password,group):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect( (host,port) )
self.buffer = ''
self.user = user
self.password = password
self.group = group
self.n = 0
self.head = ''
self.body = ''
self.inbuffer = ''
self.dataline = ''
self.handleline = self.handleline_gen()

def handle_connect(self):
pass

def handle_close(self):
pass

def writable(self):
return (len(self.buffer) > 0)

def handle_write(self):
print 'sending: ' + self.buffer.strip()
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
if self.buffer:
print 'didnt send whole line' #does this ever happen?
print 'didnt send whole line' #just getting my attention
print 'didnt send whole line' #in case it does

def handle_read(self):
self.inbuffer += self.recv(8192)
while 1:
n = self.inbuffer.find('\r\n')
if n > -1:
self.dataline = self.inbuffer[:n+2]
self.inbuffer = self.inbuffer[n+2:]
try:
result = self.handleline.next()
if result == 'OK':
pass # everything is groovy
elif result == 'DONE':
self.del_channel() # group walk is finished
break
else:
print 'something has gone wrong!'
print result
print self.dataline
self.del_channel()
break
except StopIteration:
print 'should never be here'
print 'why did my generator run out?'
print 'why god? why?!'
print self.dataline
self.del_channel()
break
else:
break

def handleline_gen(self):
#
# handshakey stuff
# welcome username password group
# after this is set we'll start the message walk
#
if self.dataline[:3] == '200': # welcome, post ok
print self.dataline.strip()
self.buffer = 'authinfo user ' + self.user + '\r\n'
yield 'OK'
else:
yield 'WTF?! fail welcome? god hates me!'
#
if self.dataline[:3] == '381': # more auth needed
print self.dataline.strip()
self.buffer = 'authinfo pass ' + self.password + '\r\n'
yield 'OK'
else:
yield 'WTF?! fail authinfo user'
#
if self.dataline[:3] == '281': # auth ok, go to town!
print self.dataline.strip()
self.buffer = 'group ' + self.group + '\r\n'
yield 'OK'
else:
yield 'WTF?! fail authinfo pass'
#
if self.dataline[:3] == '211': # group
print self.dataline.strip()
self.buffer = 'next\r\n'
yield 'OK'
else:
yield 'WTF?! fail group'
#
# main state loop
# walk from one message to the next
# issuing HEAD and BODY for each
# never reenter here after we receive '421', no next article
# so we should never issue StopIterator
#
while 1:
#
if self.dataline[:3] == '223': # next
print self.dataline.strip()
self.buffer = 'head\r\n'
yield 'OK'
elif self.dataline[:3] == '421': # err, no next article
yield 'DONE'
else:
yield 'WTF?! fail next'
#
if self.dataline[:3] == '221': # head
print self.dataline.strip()
self.head = ''
yield 'OK'
# XXX what am I going to do if the server explodes
while self.dataline <> '.\r\n':
self.head += self.dataline
yield 'OK'
# XXX parse headers here
# XXX decide whether we want body
self.buffer = 'body\r\n'
yield 'OK'
else:
yield 'WTF?! fail head'
#
if self.dataline[:3] == '222': # body
print self.dataline.strip()
self.body = ''
yield 'OK'
# XXX what am I going to do if the server explodes
while self.dataline <> '.\r\n':
# XXX line-by-line decode here (someday)
self.body += self.dataline
yield 'OK'
self.decode()
self.buffer = 'next\r\n'
yield 'OK'
else:
yield 'WTF?! fail body'

def decode(self):
"""decode message body.
try UU first, just decode body
then mime, decode head+body
save in tempfile if fail"""
tempname = 'temp' + `self.n` + '.txt'
self.n += 1
file(tempname,'wb').write(self.body)
f = file(tempname)
try:
uu.decode(f)
except Exception,v:
print 'uu failed code: ',v
print 'trying MIME'
file(tempname,'wb').write(self.head+self.body)
f = file(tempname)
message = email.message_from_file(f)
for part in message.walk():
print part.get_content_type()
filename = part.get_filename()
if filename:
if not os.path.isfile(filename):

file(filename,'wb').write(part.get_payload(decode=True))
print 'yay! MIME!'
os.remove(tempname)
else:
print "oops, we've already got one"
else:
print 'yay! UU!'
os.remove(tempname)

def main():
mynews = Newser('news.server',119,'fishboy','pass','alt.binaries')
try:
asyncore.loop()
except KeyboardInterrupt:
mynews.del_channel()
print 'yay! I quit!'

if __name__ == '__main__':
main()
 
J

Josiah Carlson

Fishboy,

Good start.
self.buffer = ''
self.inbuffer = ''
self.dataline = ''

Depending on how much data you are looking to move, straight strings and
string concatenation is not always the fastest way to deal with things.

For incoming buffers...

self.inbuffer = []
#as we receive data
self.inbuffer.append(data)
#when we have as much as we need...
data = ''.join(self.inbuffer)
self.inbuffer = []
def handle_close(self):
pass

Um...you are going to want to actually handle that close...with a
'self.close()'
def handle_write(self):
print 'sending: ' + self.buffer.strip()
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
if self.buffer:
print 'didnt send whole line' #does this ever happen?
print 'didnt send whole line' #just getting my attention
print 'didnt send whole line' #in case it does

Try sending a few megabytes to it, you'll find the upper end of the
amount you can send at any one time.

Generally, it all depends on both the configuration of your TCP/IP
implementation (sending window size), as well as the actual throughput
and latencies of the connection to the other machine.

What asynchat does (and many other libraries do) is to pre-partition the
data into small chunks. Asynchat sticks with 512 bytes (a little small
IMO), but one would even be conservative at 1024 bytes (ethernet frames
are ~1500, so there is even some headroom). Tune it based on what your
connection is doing over time, this is Python.

Also, use a real FIFO class for buffering. You can modify Raymond's
fastest fifo implementation listed here:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68436
to insert those blocks that are accidentally not sent completely.
def handle_read(self):

Check out the handle_read() method of asynchat.async_chat. It does some
really good things to make line-based protocols work well, and there are
even some optimizations that can be done for your specific protocol.

self.inbuffer += self.recv(8192)

Don't do the above. Not a good idea.

The generator method looks reasonable, but I admit, I didn't read too
deeply (I really should be going to bed).

Keep working with sockets, they can be fun (if not frustrating at
times). If at any point you contemplate going towards a threaded
server, just remember:
1. Asyncore (modified properly) can handle hundreds of connections (I'm
currently limited by the number of file handles Python is compiled with)
and saturate 100mbit with ease (I have written clients that saturate
1gbit for certain tasks).
2. Standard Python threads get laggy, and actually reduce bandwidth as
you approach 10-20 threads (word is that Stackless' tasklets are damn fast).


- Josiah
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,999
Messages
2,570,243
Members
46,835
Latest member
lila30

Latest Threads

Top