RFC: my iterthreader module

J

Justin Azoff

I have this iterthreader module that I've been working on for a while
now. It is similar to itertools.imap, but it calls each function in
its own thread and uses Queues for moving the data around. A better
name for it would probably be ithreadmap, but anyway...

The short explanation of it is if you have a loop like
for item in biglist:
print "The value for %s is %s" % (item, slowfunc(item))
or
for item,val in ((item, slowfunc(item)) for item in biglist):
print "The value for %s is %s" % (item, val)

you can simply rewrite it as

for item,val in iterthreader.Threader(slowfunc, biglist):
print "The value for %s is %s" % (item, val)

and it will hopefully run faster. The usual GIL issues still apply of
course.... You can also subclass it in various ways, but I almost
always just call it in the above manner.

So, can anyone find any obvious problems with it? I've been meaning to
re-post [1] it to the python cookbook, but I'd like to hear what
others think first. I'm not aware of any other module that makes this
particular use of threading this simple.

[1] I _think_ I posted it before, but that may have just been in a
comment

import threading
import Queue

class Threader:
def __init__(self, func=None, data=None, numthreads=2):
if not numthreads > 0:
raise AssertionError("numthreads should be greater than 0")

if func:
self.handle_input=func
if data:
self.get_input = lambda : data

self._numthreads=numthreads
self.threads = []
self.run()


def __iter__(self):
return self

def next(self):
still_running, input, output = self.DQ.get()
if not still_running:
raise StopIteration
return input, output

def get_input(self):
raise NotImplementedError, "You must implement get_input as a
function that returns an iterable"

def handle_input(self, input):
raise NotImplementedError, "You must implement handle_input as
a function that returns anything"

def _handle_input(self):
while 1:
work_todo, input = self.Q.get()
if not work_todo:
break
self.DQ.put((True, input, self.handle_input(input)))

def cleanup(self):
"""wait for all threads to stop and tell the main iter to
stop"""
for t in self.threads:
t.join()
self.DQ.put((False,None,None))


def run(self):
self.Q=Queue.Queue()
self.DQ=Queue.Queue()
for x in range(self._numthreads):
t=threading.Thread(target=self._handle_input)
t.start()
self.threads.append(t)

try :
for x in self.get_input():
self.Q.put((True, x))
except NotImplementedError, e:
print e
for x in range(self._numthreads):
self.Q.put((False, None))

threading.Thread(target=self.cleanup).start()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,995
Messages
2,570,230
Members
46,819
Latest member
masterdaster

Latest Threads

Top