Running a python farm

I

Ian McConnell

What's the pythonic way of sending out a set of requests in parallel?

My program throws an image at the server and then waits for the result. I'm
currently using this bit of socket code to send an image to server on
another machine.


import socket
import SocketServer
import cPickle

def oneclient(host, port, array):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sent = sock.sendall(cPickle.dumps(array, 0))
sock.shutdown(1)
rfile = sock.makefile('rb')
return cPickle.loads(rfile.read())


Processing the image can take several minutes and I have loads of images to
process, but I also have several servers available, so I'd like save some
time by distributing the images around the servers. So for 'n' servers,
throw 'n' images at them. Then as each server finishes, give it another
image to work on until I've done all the images.

What should I be looking for, for a pythonic solution? I'm stuck for
terminology? Threads? Load balancing? Processors farms? Any pointers or
suggestions welcome.



I don't have to do this too often, so I would prefer a simple solution over
performance.


Thanks.
 
A

Andrew Bennetts

What's the pythonic way of sending out a set of requests in parallel?

My program throws an image at the server and then waits for the result. I'm
currently using this bit of socket code to send an image to server on
another machine.


import socket
import SocketServer
import cPickle

def oneclient(host, port, array):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sent = sock.sendall(cPickle.dumps(array, 0))
sock.shutdown(1)
rfile = sock.makefile('rb')
return cPickle.loads(rfile.read())


Processing the image can take several minutes and I have loads of images to
process, but I also have several servers available, so I'd like save some
time by distributing the images around the servers. So for 'n' servers,
throw 'n' images at them. Then as each server finishes, give it another
image to work on until I've done all the images.

Probably the simplest way is to have n worker threads, and a main thread
that pushes the images to be processed onto a Queue.Queue instance.

So, your worker thread function might look something like (using your
function above):

def worker(host, port, queue):
while 1:
array = queue.get()
if array is None:
return
result = oneclient(host, port, array)
# do something with the result, perhaps store in a shared
# list or dictionary?

Then you just need a main function like:

import Queue, threading

servers = [('host1', 1111), ('host2', 2222)] # etc...

def main():
q = Queue.Queue()
workers = []
for host, port in servers:
t = threading.Thread(target=worker, args=(host, port, q))
t.start()
workers.append(t)

# assume we get arrays to be processed from somewhere
for array in arrays:
q.put(array)

# Tell the worker threads to stop when there's nothing left to do
for i in range(len(workers)):
q.put(None)

# Wait for all the treads to finish
for t in workers:
t.join()

# Do something with the results gathered by the workers
pass

-Andrew.
 
J

John Roth

Ian McConnell said:
What's the pythonic way of sending out a set of requests in parallel?

My program throws an image at the server and then waits for the result. I'm
currently using this bit of socket code to send an image to server on
another machine.


import socket
import SocketServer
import cPickle

def oneclient(host, port, array):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sent = sock.sendall(cPickle.dumps(array, 0))
sock.shutdown(1)
rfile = sock.makefile('rb')
return cPickle.loads(rfile.read())


Processing the image can take several minutes and I have loads of images to
process, but I also have several servers available, so I'd like save some
time by distributing the images around the servers. So for 'n' servers,
throw 'n' images at them. Then as each server finishes, give it another
image to work on until I've done all the images.

What should I be looking for, for a pythonic solution? I'm stuck for
terminology? Threads? Load balancing? Processors farms? Any pointers or
suggestions welcome.



I don't have to do this too often, so I would prefer a simple solution over
performance.

All the solutions I know of have adequate performance. Andrew's
suggestion of using a thread to handle coordination with each of the
server processors is the conventional textbook approach; there's
much to recommend doing it the way everyone else does.

I've frequently found it simpler to not deal with thread coordination
problems (which can get real nasty) by using the select or poll
services from the select module (7.3 in the 2.2.3 documentation).
This leads to an event driven style that is closer to GUI programming
than the procedure oriented style typical of threads.

Any way you do it, it's going to complicate your program.

John Roth
 
B

Brian Kelley

Ian said:
What's the pythonic way of sending out a set of requests in parallel?

My program throws an image at the server and then waits for the result. I'm
currently using this bit of socket code to send an image to server on
another machine.


import socket
import SocketServer
import cPickle

def oneclient(host, port, array):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sent = sock.sendall(cPickle.dumps(array, 0))
sock.shutdown(1)
rfile = sock.makefile('rb')
return cPickle.loads(rfile.read())

You don't need threads here. We can leverage the fact that
select.select can poll to see if results are ready, how about something
like:

class Client:
def __init__(host, port, array):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock = sock
self.host = host
self.port = port
sock.connect((host, port))
sent = sock.sendall(cPickle.dumps(array, 0))
sock.shutdown(1)
self.rfile = sock.makefile('rb')

def result(self):
r,w,e = selelect.select([self.rfile], [], [], 1)
if not r:
return None
else:
return cPickle.loads(self.rfile.read())

all_images = [...]
hosts = [(host,port), (host1,port1)]
client_list = []
# assign images to hosts
for host, port in hosts:
client_list.append(Client(host, port, all_images.pop()))

# process
while client_list:
next_client_list = []
for o in client_list:
res = o.result()
if o is not None:
# do something with image result
if all_images: # if we have images assign to completed host
next_client_list.append(Client(o.host, o.port,
all_images.pop())
else:
next_client_list.append(o)
client_list = next_client_list
 
H

Hung Jung Lu

Ian McConnell said:
Processing the image can take several minutes and I have loads of images to
process, but I also have several servers available, so I'd like save some
time by distributing the images around the servers. So for 'n' servers,
throw 'n' images at them. Then as each server finishes, give it another
image to work on until I've done all the images.

That's all fine. In the short term, that's the way to get things done.
You many also want to look at xmlrpclib.

For longer term, you may want to do extra decoupling, to make things
more scalable/maintainable/fault-tolerant. A few ideas I got recently
were: (1) prepare tasks well enough so they can kind of run "off-line"
in a pool, you may use GUIDs (globally unique ids) to tag the tasks
(2) do remote-to-local calls instead of local-to-remote calls. This
means you need broadcast capability. (3) Write your own task manager,
an installable on all machines (local and remote), so that you can
dispatch programs and data files more easily, start and stop remote
services. For instance, you now may have a remote server to do image
processing, but what in the future you want to have servers to do
sound file processing? What if you need to upgrade your remote
programs?

regards,

Hung Jung
 
I

Ian McConnell

Ian McConnell said:
What's the pythonic way of sending out a set of requests in parallel?

My program throws an image at the server and then waits for the result. I'm
currently using this bit of socket code to send an image to server on
another machine.

Once again, thanks to all those who replied.

In the end I went with the select and poll method as I only have a small
number of machines to run my code on and this method fitted in easiest with
my existing program.

Actually, it also turns out that my code isn't quite as parallelised as I
thought it was and having results coming back in a different order confused
some of the later bookkeeping, so I've also done a version (pclient2) that
maintains the image order. Even with this limitation, I do get a good speed
up.

I'm sure this code can be made more efficient and is probably fairly easy to
deadlock, but it works for me.




import sys
import socket
import cPickle
import select
import string

class Machine:
def __init__(self, hostname):
colon = string.rfind(hostname, ':')
if colon >= 0:
self.host = hostname[0:colon]
self.port = int(hostname[colon+1:])
else:
self.host = hostname
self.port = 10000 # default port
# Should check for host being alive and possible endian issues
def connect(self):
return (self.host, self.port)
def __repr__(self):
return "%s:%d" % (self.host, self.port)

class Client:
def __init__(self, host, array, opts):
print 'NEW CLIENT', host, array.shape
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host
sock.connect(host.connect())
sent = sock.sendall(cPickle.dumps((array, opts), 0))
sock.shutdown(1)
self.rfile = sock.makefile('rb')

def result(self):
r,w,e = select.select([self.rfile], [], [], 1)
if not r:
return None
else:
return cPickle.loads(self.rfile.read())
def __repr__(self):
return "%s" % self.host
def name(self):
return self.host


def pclient(hostnames, all_images, opts):
"""
hostnames is a list of machine names (as host:port) that are willing
to process data
all_image is a list of images
opts is a dictionary of processing options.
"""

client_list = []
result = []
# assign images to hosts
hosts = []
while len(hostnames) > 0:
host = Machine(hostnames.pop())
if host is not None:
hosts.append(host)
for host in hosts:
client_list.append(Client(host, all_images.pop(), opts))

# process
while client_list:
print len(client_list), 'clients busy'
next_client_list = []
for o in client_list:
res = o.result()
if res is not None:
result.append(res)
if all_images: # if we have images assign to completed host
next_client_list.append(Client(o.host, all_images.pop(),
opts))
else:
next_client_list.append(o)
client_list = next_client_list
return result



#
#
#
# Extra code to ensure results return in same order as submitted.
#
#
#


class fifo:
"""Simple implementation of a First-In-First-Out structure."""
def __init__(self):
self.in_stack = []
def push(self, obj):
self.in_stack.append(obj)
# print 'fifo push', self.in_stack
def pop(self):
return self.in_stack.pop(0)
def __repr__(self):
return str(self.in_stack)
def __len__(self):
return len(self.in_stack)
def head(self):
return self.in_stack[0]

class Workers:
def __init__(self, hosts, opts):
self.idle = hosts
self.busy = fifo()
self.opts = opts

def free(self):
return len(self.idle)

def newjob(self, array):
if array == [] or array == None:
# Delete idle list
self.idle = []
return
host = self.idle.pop()
self.busy.push(Client(host, array, self.opts))

def poll(self):
if len(self.busy) == 0:
return None
host = self.busy.head()
return host.result()

def done(self):
if len(self.busy) == 0:
return None
res = None
while res == None:
res = self.poll()
host = self.busy.pop()
self.idle.append(host.name())
print ' idle:', self.idle, ' busy:', self.busy
return res

def pclient2(hostnames, all_images, opts):
"""
hostnames is a list of machine names (as host:port) that are willing
to process data
all_image is a list of images
opts is a dictionary of processing options.
Returns results in same order as input.
"""
result = []
hosts = []
while len(hostnames) > 0:
host = Machine(hostnames.pop())
if host is not None:
hosts.append(host)
workers = Workers(hosts, opts)

while workers.free() > 0:
workers.newjob(all_images.pop())

print 'PCLIENT2 idle:', workers.idle, ' busy:', workers.busy
while 1:
res = workers.done()
if res == None:
break
result.append(res)
# Queue up another job if there are images left to process
if all_images:
workers.newjob(all_images.pop())
return result
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
474,169
Messages
2,570,919
Members
47,460
Latest member
eibafima

Latest Threads

Top