C
corey.coughlin
Alright, so I've been following some of the arguments about enhancing
parallelism in python, and I've kind of been struck by how hard things
still are. It seems like what we really need is a more pythonic
approach. One thing I've been seeing suggested a lot lately is that
running jobs in separate processes, to make it easy to use the latest
multiprocessor machines. Makes a lot of sense to me, those processors
are going to be more and more popular as time goes on. But it would
also be nice if it could also turn into a way to make standard
threading a little easier and trouble free. But I'm not seeing an easy
way to make it possible with the current constraints of the language,
so it seems like we're going to need some kind of language improvement.
Thinking of it from that perspective, I started thinking about how it
would be easy to deal with in a more idealized sense. It would be nice
to abstract out the concept of running something in parallel to
something that can be easily customized, is flexible enough to use in a
variety of concepts, and is resonably hard to screw up and fairly easy
to use. Perhaps a new built-in type might be just the trick. Consider
a new suite:
pardef <Name>(self, <par type>, arguments...):
self.send(<dest pardef>, <tag>, arguments)
self.receive(<tag>, arguments)
return arguments
yield arguments
so the object would then be something you can create an instance of,
and set up like a normal object, and it would have other interface
functions as well. Consider your basic vector add operation:
import concurrent
import array
pardef vecadd(self, concurrent.subprocess, veca, vecb, arrtype):
import array
output = array.array(arrtype)
for a,b in zip(veca, vecb):
output.append( a + b)
return output
a = array.array('d')
b = array.array('d')
for i in range(1000):
a.append(float(i))
b.append(float(i))
h1 = vecadd(a[:500], b[:500], 'd')
h2 = vecadd()
h2.veca = a[500:]
h2.vecb = b[500:]
h2.arrtype = 'd'
h1.run()
h2.run()
c = h1.result + h2.result
You can see a few things in this example. First off, you'll notice
that vecadd has the import for array inside it. One of the most
important things about the pardef is that it must not inherit anything
from the global scope, all variable passing must occur through either
the arguments or .receive statements. You'll also notice that it's
possible to set the arguments like instance values. This isn't as
important in this case, but it could be very useful for setting
arguments for other pardefs. Take this example of your basic SIMD-ish
diffusion simulation:
import concurrent
pardef vecadd(self, concurrent.subprocess, right, left, up, down,
initval):
current = initval
maxruns = 100
updef = not (isinstance(up, int) or isintance(up, float))
downdef = not (isinstance(down, int) or isintance(down, float))
rightdef = not (isinstance(right, int) or isintance(right, float))
leftdef = not (isinstance(left, int) or isintance(left, float))
for i in range(maxruns):
if updef:
upval = self.receive(up, 'up')
else:
upval = up
if downdef:
downval = self.receive(down, 'down')
else:
downval = down
if rightdef:
rightval = self.receive(right, 'right')
else:
rightval = right
if leftdef:
leftval = self.receive(left, 'left')
else:
leftval = left
current = (upval + downval + leftval + rightval) / 4
if updef:
up.send('down', current)
if downdef:
down.send('up', current)
if rightdef:
right.send('left', current)
if leftdef:
left.send('right', current)
return current
diffgrid = {}
for x, y in zip(range(10), range(10)):
diffgrid[(x, y)] = vecadd()
for x, y in zip(range(10), range(10)):
gridpt = diffgrid[(x, y)]
gridpt.initval = 50.0
if x == 0:
gridpt.left = 75.0
else:
gridpt.left = diffgrid[(x-1, y)]
if x == 10:
gridpt.right = 50.0
else:
gridpt.right = diffgrid[(x+1, y)]
if y == 0:
gridpt.down = 0.0
else:
gridpt.down = diffgrid[(x, y-1)]
if y == 10:
gridpt.up = 100.0
else:
gridpt.up = diffgrid[(x, y+1)]
for coord in diffgrid:
diffgrid[coord].run()
for x, y in zip(range(10), range(10)):
print '(%i, %i) = %f' % (x, y, diffgrid[(x,y)].return())
Now sure, this example is a little contrived, but it shows the utility
of allowing the input parameters to be set after instantiating the
pardef. You can also imagine that this would be useful for running a
single pardef over and over again with different arguments.
Remember that pardefs don't inherit any data from the global scope.
Data is only passed in through receive statements and the arguments.
The <par type> would control how the send and receive functions work,
and how the return and yield pass data back. In a way, the pardef
works something like a built in type, and something like an interface.
In this way, it can be used to implement different kinds of
parallelism. It could be used for threads, for processes, and
conceivably for clustering as well. I suspect that the easiest way to
implement it would be to use strict data copying for send and recieve,
and only allow shared data through the arguments, but it would be
easiest to leave it to the <partype> implementation. This would in
effect create this data type as more of an interface than anything
else. Since parallel processing is kind of complicated, this would
allow for a number of different approaches, but still keep them all
within the same framework, creating consistency between the approaches.
You could test with a safe version of threading, then change it to a
less safe but higher performance version if you need to, and so on.
Ultimately the approach would be obvious without having to get bogged
down in details.
So to be specific, the interface to this object would consist of at
least the following functions:
..run() -> runs the code, or restarts from a yield, non-blocking
..return() -> gets the return or yield value of the code, blocking
..send(<label>, values...) -> sends a value to the code, non-blocking
..receive(<label>) -> gets a new value for some code, blocking
..kill() -> kills some running code, blocking
Now sure, I suppose there could be other methods available, but I can't
think of any other good ones off hand. But anyway, that's pretty much
what I was thinking of, more or less. Of course, implementing it all
would be more than a little tricky. For one, all the standard
libraries that accessed limited system resources would have to be
played with, like file access and other such things. For instance,
consider 'print', which writes stuff to stdout, would probably have to
be rewritten as something like this:
import concurrent
import sys
pardef parprint(self, concurrent.systhread, outfile):
while True:
newmessage = self.receive('message')
outfile.write('%s\n' % newmessage)
outfile.flush() #may not be necessary
printp = parprint(sys.__stdout__)
printp.run()
then every 'print' call will need to be intercepted and recast as
something like this:
printp.send('message',outstr)
Of course, this kind of rewrite would only be needed for programs using
concurrency. I suppose you could have some kind of switch that got set
in the parser to detect pardef's and use concurrent libraries for
those, and use standard ones otherwise. In fact, if we wanted to ease
into the implementation, it might be simpler to allow ther parser to
detect when a parallel library isn't available for a function, so that
unimplemented libraries would give an error. For most libraries that
don't use scarce system resources, a parallel compatible library could
be added easily.
There could be some issues with this that I haven't thought of,
though. There may be some forms of concurrency that wouldn't work too
well with this scheme. It would take another keyword, and that usually
seems to be trouble. I'm not really even that fond of 'pardef', but
the alternatives I can think of (concurrentdef, parallelblock,
threaddef, processdef, thread, process, etc.) seem worse. Be sure to
chime in if you have any suggestions for a better one. There is also
no existing implementation for this, I thought I'd run it by everyone
here to see if it would even have a chance of being accepted. So hey,
let me know what you think, and if most people like it, I guess I'll
get started on an implementation and PEP and all that. OK? So what do
you all think?
parallelism in python, and I've kind of been struck by how hard things
still are. It seems like what we really need is a more pythonic
approach. One thing I've been seeing suggested a lot lately is that
running jobs in separate processes, to make it easy to use the latest
multiprocessor machines. Makes a lot of sense to me, those processors
are going to be more and more popular as time goes on. But it would
also be nice if it could also turn into a way to make standard
threading a little easier and trouble free. But I'm not seeing an easy
way to make it possible with the current constraints of the language,
so it seems like we're going to need some kind of language improvement.
Thinking of it from that perspective, I started thinking about how it
would be easy to deal with in a more idealized sense. It would be nice
to abstract out the concept of running something in parallel to
something that can be easily customized, is flexible enough to use in a
variety of concepts, and is resonably hard to screw up and fairly easy
to use. Perhaps a new built-in type might be just the trick. Consider
a new suite:
pardef <Name>(self, <par type>, arguments...):
self.send(<dest pardef>, <tag>, arguments)
self.receive(<tag>, arguments)
return arguments
yield arguments
so the object would then be something you can create an instance of,
and set up like a normal object, and it would have other interface
functions as well. Consider your basic vector add operation:
import concurrent
import array
pardef vecadd(self, concurrent.subprocess, veca, vecb, arrtype):
import array
output = array.array(arrtype)
for a,b in zip(veca, vecb):
output.append( a + b)
return output
a = array.array('d')
b = array.array('d')
for i in range(1000):
a.append(float(i))
b.append(float(i))
h1 = vecadd(a[:500], b[:500], 'd')
h2 = vecadd()
h2.veca = a[500:]
h2.vecb = b[500:]
h2.arrtype = 'd'
h1.run()
h2.run()
c = h1.result + h2.result
You can see a few things in this example. First off, you'll notice
that vecadd has the import for array inside it. One of the most
important things about the pardef is that it must not inherit anything
from the global scope, all variable passing must occur through either
the arguments or .receive statements. You'll also notice that it's
possible to set the arguments like instance values. This isn't as
important in this case, but it could be very useful for setting
arguments for other pardefs. Take this example of your basic SIMD-ish
diffusion simulation:
import concurrent
pardef vecadd(self, concurrent.subprocess, right, left, up, down,
initval):
current = initval
maxruns = 100
updef = not (isinstance(up, int) or isintance(up, float))
downdef = not (isinstance(down, int) or isintance(down, float))
rightdef = not (isinstance(right, int) or isintance(right, float))
leftdef = not (isinstance(left, int) or isintance(left, float))
for i in range(maxruns):
if updef:
upval = self.receive(up, 'up')
else:
upval = up
if downdef:
downval = self.receive(down, 'down')
else:
downval = down
if rightdef:
rightval = self.receive(right, 'right')
else:
rightval = right
if leftdef:
leftval = self.receive(left, 'left')
else:
leftval = left
current = (upval + downval + leftval + rightval) / 4
if updef:
up.send('down', current)
if downdef:
down.send('up', current)
if rightdef:
right.send('left', current)
if leftdef:
left.send('right', current)
return current
diffgrid = {}
for x, y in zip(range(10), range(10)):
diffgrid[(x, y)] = vecadd()
for x, y in zip(range(10), range(10)):
gridpt = diffgrid[(x, y)]
gridpt.initval = 50.0
if x == 0:
gridpt.left = 75.0
else:
gridpt.left = diffgrid[(x-1, y)]
if x == 10:
gridpt.right = 50.0
else:
gridpt.right = diffgrid[(x+1, y)]
if y == 0:
gridpt.down = 0.0
else:
gridpt.down = diffgrid[(x, y-1)]
if y == 10:
gridpt.up = 100.0
else:
gridpt.up = diffgrid[(x, y+1)]
for coord in diffgrid:
diffgrid[coord].run()
for x, y in zip(range(10), range(10)):
print '(%i, %i) = %f' % (x, y, diffgrid[(x,y)].return())
Now sure, this example is a little contrived, but it shows the utility
of allowing the input parameters to be set after instantiating the
pardef. You can also imagine that this would be useful for running a
single pardef over and over again with different arguments.
Remember that pardefs don't inherit any data from the global scope.
Data is only passed in through receive statements and the arguments.
The <par type> would control how the send and receive functions work,
and how the return and yield pass data back. In a way, the pardef
works something like a built in type, and something like an interface.
In this way, it can be used to implement different kinds of
parallelism. It could be used for threads, for processes, and
conceivably for clustering as well. I suspect that the easiest way to
implement it would be to use strict data copying for send and recieve,
and only allow shared data through the arguments, but it would be
easiest to leave it to the <partype> implementation. This would in
effect create this data type as more of an interface than anything
else. Since parallel processing is kind of complicated, this would
allow for a number of different approaches, but still keep them all
within the same framework, creating consistency between the approaches.
You could test with a safe version of threading, then change it to a
less safe but higher performance version if you need to, and so on.
Ultimately the approach would be obvious without having to get bogged
down in details.
So to be specific, the interface to this object would consist of at
least the following functions:
..run() -> runs the code, or restarts from a yield, non-blocking
..return() -> gets the return or yield value of the code, blocking
..send(<label>, values...) -> sends a value to the code, non-blocking
..receive(<label>) -> gets a new value for some code, blocking
..kill() -> kills some running code, blocking
Now sure, I suppose there could be other methods available, but I can't
think of any other good ones off hand. But anyway, that's pretty much
what I was thinking of, more or less. Of course, implementing it all
would be more than a little tricky. For one, all the standard
libraries that accessed limited system resources would have to be
played with, like file access and other such things. For instance,
consider 'print', which writes stuff to stdout, would probably have to
be rewritten as something like this:
import concurrent
import sys
pardef parprint(self, concurrent.systhread, outfile):
while True:
newmessage = self.receive('message')
outfile.write('%s\n' % newmessage)
outfile.flush() #may not be necessary
printp = parprint(sys.__stdout__)
printp.run()
then every 'print' call will need to be intercepted and recast as
something like this:
printp.send('message',outstr)
Of course, this kind of rewrite would only be needed for programs using
concurrency. I suppose you could have some kind of switch that got set
in the parser to detect pardef's and use concurrent libraries for
those, and use standard ones otherwise. In fact, if we wanted to ease
into the implementation, it might be simpler to allow ther parser to
detect when a parallel library isn't available for a function, so that
unimplemented libraries would give an error. For most libraries that
don't use scarce system resources, a parallel compatible library could
be added easily.
There could be some issues with this that I haven't thought of,
though. There may be some forms of concurrency that wouldn't work too
well with this scheme. It would take another keyword, and that usually
seems to be trouble. I'm not really even that fond of 'pardef', but
the alternatives I can think of (concurrentdef, parallelblock,
threaddef, processdef, thread, process, etc.) seem worse. Be sure to
chime in if you have any suggestions for a better one. There is also
no existing implementation for this, I thought I'd run it by everyone
here to see if it would even have a chance of being accepted. So hey,
let me know what you think, and if most people like it, I guess I'll
get started on an implementation and PEP and all that. OK? So what do
you all think?