Persistent variable in subprocess using multiprocessing?

M

mheavner

I'm using multiprocessing to spawn several subprocesses, each of which
uses a very large data structure (making it impractical to pass it via
pipes / pickling). I need to allocate this structure once when the
process is created and have it remain in memory for the duration of
the process. The way the multiprocessing module is set up, only the
'run' method runs within the subprocess - so creating a wrapper class
with a constructor that allocates the structure in __init__ will not
work, as far as I know, as this will still be within the parent
process.

If I were working in C/C++, I would declare the variable "static"
within the function body - is there any way with the multiprocessing
module to have persistent data members within subprocesses?

Any ideas??

Thanks,
Matt
 
D

Diez B. Roggisch

mheavner said:
I'm using multiprocessing to spawn several subprocesses, each of which
uses a very large data structure (making it impractical to pass it via
pipes / pickling). I need to allocate this structure once when the
process is created and have it remain in memory for the duration of
the process. The way the multiprocessing module is set up, only the
'run' method runs within the subprocess - so creating a wrapper class
with a constructor that allocates the structure in __init__ will not
work, as far as I know, as this will still be within the parent
process.

If I were working in C/C++, I would declare the variable "static"
within the function body - is there any way with the multiprocessing
module to have persistent data members within subprocesses?

Works for me, at least under OSX (and I presume *nixes in general work.)
No idea about Windows.


The thing to keep in mind is that forking is used, and that
interpreter-state up to the moment of the fork is the same for all
subprocesses.



from multiprocessing import Process

class MyProcess(Process):


def __init__(self, huge_shared_state):
self.huge_shared_state = huge_shared_state
super(MyProcess, self).__init__()


def run(self):
print self.name, len(self.huge_shared_state)




shared_state = range(1000000)

processes = []
for i in xrange(10):
p = MyProcess(shared_state)
p.start()
processes.append(p)


for p in processes:
p.join()



Diez
 
P

Piet van Oostrum

mheavner said:
m> I'm using multiprocessing to spawn several subprocesses, each of which
m> uses a very large data structure (making it impractical to pass it via
m> pipes / pickling). I need to allocate this structure once when the
m> process is created and have it remain in memory for the duration of
m> the process. The way the multiprocessing module is set up, only the
m> 'run' method runs within the subprocess - so creating a wrapper class
m> with a constructor that allocates the structure in __init__ will not
m> work, as far as I know, as this will still be within the parent
m> process.
m> If I were working in C/C++, I would declare the variable "static"
m> within the function body - is there any way with the multiprocessing
m> module to have persistent data members within subprocesses?
m> Any ideas??

Your post is not entirely clear. Is `the process' the same as `the
subprocess'?

Assuming it is, what is the problem? You can create the datastructure
first thing in the run method can't you?

Like this:

from multiprocessing import Process
from time import sleep
from random import random

class MyProcess(Process):

def __init__(self, number):
self.number = number
Process.__init__(self)

def run(self):
print "Process %s started" % self.number
self.data = range(self.number * 100000, (self.number + 1) * 100000)
self.doit()

def doit(self):
for i in range(5):
sleep(3 * random())
self.data += i
print self.data

processes = []
for k in range(10):
p = MyProcess(k)
p.start()
processes.append(p)


for p in processes:
p.join()
 
M

mheavner

m> I'm using multiprocessing to spawn several subprocesses, each of which
m> uses a very large data structure (making it impractical to pass it via
m> pipes / pickling). I need to allocate this structure once when the
m> process is created and have it remain in memory for the duration of
m> the process. The way the multiprocessing module is set up, only the
m> 'run' method runs within the subprocess - so creating a wrapper class
m> with a constructor that allocates the structure in __init__ will not
m> work, as far as I know, as this will still be within the parent
m> process.
m> If I were working in C/C++, I would declare the variable "static"
m> within the function body - is there any way with the multiprocessing
m> module to have persistent data members within subprocesses?
m> Any ideas??

Your post is not entirely clear. Is `the process' the same as `the
subprocess'?

Assuming it is, what is the problem? You can create the datastructure
first thing in the run method can't you?

Like this:

from multiprocessing import Process
from time import sleep
from random import random

class MyProcess(Process):

    def __init__(self, number):
        self.number = number
        Process.__init__(self)

    def run(self):
        print "Process %s started" % self.number
        self.data = range(self.number * 100000, (self.number + 1) * 100000)
        self.doit()

    def doit(self):
        for i in range(5):
            sleep(3 * random())
            self.data += i
            print self.data

processes = []
for k in range(10):
    p = MyProcess(k)
    p.start()
    processes.append(p)

for p in processes:
    p.join()


'The process' refers to the subprocess. I could do as you say, load
the data structure each time, but the problem is that takes a
considerable amount of time compared to the the actual computation
with the data it contains. I'm using these processes within a loop as
follows:

# Don't recreate processes or Queues
pop1 = Queue()
pop2 = Queue()
pop_out = Queue()
p1 = CudaProcess(0, args=(costf,pop1,pop_out))
p2 = CudaProcess(1, args=(costf,pop2,pop_out))

# Main loop
for i in range(maxiter):
print 'ITERATION: '+str(i)
if log != None:
l = open(log,'a')
l.write('Iteration: '+str(i)+'\n')
l.close()

# Split population in two
pop1.putmany(pop[0:len(pop)/2])
pop2.putmany(pop[len(pop)/2:len(pop)])

# Start two processes
if not p1.isAlive():
p1.start()
print 'started %s'%str(p1.getPid())
else:
p1.run()
if not p2.isAlive():
p2.start()
print 'started %s'%str(p2.getPid())
else:
p2.run()
.
.
.

So I'd like to load that data into memory once and keep there as long
as the process is alive (ideally when the subprocess is created,
storing some sort of pointer to it), rather than loading it each time
run is called for a process within the loop. Could be my CudaProcess
class - I'll check out what Diez suggested and post back.
 
M

mheavner

Your post is not entirely clear. Is `the process' the same as `the
subprocess'?
Assuming it is, what is the problem? You can create the datastructure
first thing in the run method can't you?
Like this:
from multiprocessing import Process
from time import sleep
from random import random
class MyProcess(Process):
    def __init__(self, number):
        self.number = number
        Process.__init__(self)
    def run(self):
        print "Process %s started" % self.number
        self.data = range(self.number * 100000, (self.number + 1) * 100000)
        self.doit()
    def doit(self):
        for i in range(5):
            sleep(3 * random())
            self.data += i
            print self.data

processes = []
for k in range(10):
    p = MyProcess(k)
    p.start()
    processes.append(p)
for p in processes:
    p.join()

'The process' refers to the subprocess. I could do as you say, load
the data structure each time, but the problem is that takes a
considerable amount of time compared to the the actual computation
with the data it contains. I'm using these processes within a loop as
follows:

         # Don't recreate processes or Queues
         pop1 = Queue()
         pop2 = Queue()
         pop_out = Queue()
         p1 = CudaProcess(0, args=(costf,pop1,pop_out))
         p2 = CudaProcess(1, args=(costf,pop2,pop_out))

         # Main loop
         for i in range(maxiter):
                 print 'ITERATION: '+str(i)
                 if log != None:
                         l = open(log,'a')
                 l.write('Iteration: '+str(i)+'\n')
                 l.close()

                 # Split population in two
                 pop1.putmany(pop[0:len(pop)/2])
                 pop2.putmany(pop[len(pop)/2:len(pop)])

                 # Start two processes
                 if not p1.isAlive():
                         p1.start()
                         print 'started %s'%str(p1.getPid())
                 else:
                         p1.run()
                 if not p2.isAlive():
                         p2.start()
                         print 'started %s'%str(p2.getPid())
                 else:
                         p2.run()
                 .
                 .
                 .

So I'd like to load that data into memory once and keep there as long
as the process is alive (ideally when the subprocess is created,
storing some sort of pointer to it), rather than loading it each time
run is called for a process within the loop. Could be my CudaProcess
class - I'll check out what Diez suggested and post back.


Essentially, I'd like to "sneak" that allocation in somewhere after
the fork is done (in start()) in the context of the subprocess,
holding a pointer to that structure, but before all of the run() calls
are done
 
P

Piet van Oostrum

mheavner said:
m> 'The process' refers to the subprocess. I could do as you say, load
m> the data structure each time, but the problem is that takes a
m> considerable amount of time compared to the the actual computation
m> with the data it contains. I'm using these processes within a loop as
m> follows:
m> # Don't recreate processes or Queues
m> pop1 = Queue()
m> pop2 = Queue()
m> pop_out = Queue()
m> p1 = CudaProcess(0, args=(costf,pop1,pop_out))
m> p2 = CudaProcess(1, args=(costf,pop2,pop_out))
m> # Main loop
m> for i in range(maxiter):
m> print 'ITERATION: '+str(i)
m> if log != None:
m> l = open(log,'a')
m> l.write('Iteration: '+str(i)+'\n')
m> l.close()
m> # Split population in two
m> pop1.putmany(pop[0:len(pop)/2])
m> pop2.putmany(pop[len(pop)/2:len(pop)])
m> # Start two processes
m> if not p1.isAlive():
m> p1.start()
m> print 'started %s'%str(p1.getPid())
m> else:
m> p1.run()

That won't work. p1.run() will execute the run method in the Master
process, not in the subprocess. And if it would your could would have a
race condition: between the p1.isAlive() (which must be is_alive btw), and
the p1.run() the process can have stopped.

The proper way to do is to put the work in a Queue and let the processes
get work out of the Queue. The datastructure will remain in the process
then.
 
M

mheavner

I realize that the Queue would be the best way of doing this, however
that involves transferring the huge amount of data for each call - my
hope was to transfer it once and have it remain in memory for the
subprocess across run() calls.

mheavner <[email protected]> (m) wrote:
m> 'The process' refers to the subprocess. I could do as you say, load
m> the data structure each time, but the problem is that takes a
m> considerable amount of time compared to the the actual computation
m> with the data it contains. I'm using these processes within a loop as
m> follows:
m>          # Don't recreate processes or Queues
m>          pop1 = Queue()
m>          pop2 = Queue()
m>          pop_out = Queue()
m>          p1 = CudaProcess(0, args=(costf,pop1,pop_out))
m>          p2 = CudaProcess(1, args=(costf,pop2,pop_out))
m>          # Main loop
m>          for i in range(maxiter):
m>                  print 'ITERATION: '+str(i)
m>                  if log != None:
m>                          l = open(log,'a')
m>                  l.write('Iteration: '+str(i)+'\n')
m>                  l.close()
m>                  # Split population in two
m>                  pop1.putmany(pop[0:len(pop)/2])
m>                  pop2.putmany(pop[len(pop)/2:len(pop)])
m>                  # Start two processes
m>                  if not p1.isAlive():
m>                          p1.start()
m>                          print 'started %s'%str(p1.getPid())
m>                  else:
m>                          p1.run()

That won't work. p1.run() will execute the run method in the Master
process, not in the subprocess. And if it would your could would have a
race condition: between the p1.isAlive() (which must be is_alive btw), and
the p1.run() the process can have stopped.

The proper way to do is to put the work in a Queue and let the processes
get work out of the Queue. The datastructure will remain in the process
then.
m>                  if not p2.isAlive():
m>                          p2.start()
m>                          print 'started %s'%str(p2.getPid())
m>                  else:
m>                          p2.run()
m>                  .
m>                  .
m>                  .
m> So I'd like to load that data into memory once and keep there as long
m> as the process is alive (ideally when the subprocess is created,
m> storing some sort of pointer to it), rather than loading it each time
m> run is called for a process within the loop. Could be my CudaProcess
m> class - I'll check out what Diez suggested and post back.
 
P

Piet van Oostrum

mheavner said:
m> I realize that the Queue would be the best way of doing this, however
m> that involves transferring the huge amount of data for each call - my
m> hope was to transfer it once and have it remain in memory for the
m> subprocess across run() calls.

Which huge amount of data? The datastructure you talked about can remain
in the process. You only have to transfer the input for your calculation
in the queue but you have to do that anyway. And there is only one run
call per process. When run has terminated the process exits, so you
would have a loop in the run(0 method getting work from the queue.
 
W

Wai Yip

I think Diez' example show this work automatically in Unix. In my case
I use Windows. I use the multiprocessing.Array to share data in shared
memory. multiprocessing.Array has a limitation that it can only
reference simple C data types, not Python objects though.

Wai Yip Tung
 
P

Piet van Oostrum

There is stil something not clear in your description.
m> I'm using multiprocessing to spawn several subprocesses, each of which
m> uses a very large data structure (making it impractical to pass it via
m> pipes / pickling). I need to allocate this structure once when the
m> process is created and have it remain in memory for the duration of
m> the process.

I have read this as that every subprocess has its own large
data structure and that there is no connection between these.

But seeing where the discussion is going I guess there might be
different interpretations. So can you enlighten us how the situation is?

1. Each subprocess has a copy of a data structure that is prepared by the
master process. Therefore you want it to be passed by the fork
1a. the data structure is constant i.e. the subprocess doesn't change it
1b. the subprocess makes changes in its copy
2. Each subprocess has a seperate data structure not equal to the others
3. Something else.
 
M

mheavner

Piet,

The situation is 1a of your listed options, however my issue was
solved. I was stopping the subprocesses from consuming more data at
each iteration which led to the data being lost since the subprocess
worker function would then end - I now keep them alive across
iterations.

Thanks for your help, I'm new to the multiprocessing module and this
was very helpful!
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,968
Messages
2,570,152
Members
46,698
Latest member
LydiaHalle

Latest Threads

Top