C
castironpi
I have a data structure I think would be very useful. It passes a few
test cases, but one attempt to optimize it failed, so that may
indicate a bug. Will anyone help me debug it, verify it, or clean it?
It pertains to multi-threading and is a synchro. structure. If it is
not an interest of yours, please let it be someone else's. It's an
augmented Lock that I described a few weeks ago (different from the
FlowStep structure I was developing with Mr. Bieber).
My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock. Basic example:
thread-1 acquires lockA
thread-2 acquires lockB
thread-1 calls to acquire lockB and blocks
thread-2 calls to acquire lockA and blocks
and neither can ever continue. There is enough information to deny
thread-2's request, and return at once with failure. The method is
already somewhat treated in other structures, but I did not find one
for Python.
thread-2 calls to acquire lockA and returns Deadlock flag.
I would appreciate a half-hour and a thought or observation you make,
bugs you spot, &c. It lacks a blocking argument to acquire;
CustThread and Trylock should garbage collect themselves (note the set
of all); Trylock().lock should be an RLock; and I shouldn't need the
CustThread class at all, if anyone's going to publish this, but it can
be used for free right now.
It's runs in Python 3.0 and 2.5.
The current test consists of 6 threads randomly acquiring 20 locks and
infinitely looping. Thanks in advance.
from __future__ import with_statement
import threading
import thread
from time import sleep
from collections import deque
class CustThread:
count= 0
all= {}
def __init__( self, threadid ):
assert threadid not in CustThread.all
CustThread.count+= 1
self.id= CustThread.count
self.has= set()
self.waits= set()
def __repr__( self ):
return '<CustThread %i>'% self.id
def s( self ):
return self, 'has', self.has, 'waits', self.waits
try:
from collections import namedtuple
Pair= namedtuple( "Pair", "thread lock" )
except:
class Pair:
def __init__( self, thread, lock ):
self.thread, self.lock= thread, lock
Acquires, Blocks, Deadlocks= object(), object(), object()
resultdict= { Acquires: 'Acquires', Blocks: 'Blocks', Deadlocks:
'Deadlocks' }
class Trylock:
count= 0
locks= set()
_threads= {}
_goplock= threading.Lock()
def __init__( self, *ar, **kwar ):
#self.lock= threading.RLock()
self.lock= threading.Lock()
self._oplock= threading.Lock()
Trylock.locks.add( self )
self.id= Trylock.count
Trylock.count+= 1
def __repr__( self ):
return '<Trylock %i>'% self.id
def acquire( self ):
callerid= thread.get_ident()
with Trylock._goplock:
caller= Trylock._threads.get( callerid )
if caller is None:
caller= CustThread( callerid )
Trylock._threads[ callerid ]= caller
with Trylock._goplock:
if self._cycles():
return Deadlocks
caller.waits.add( self )
ret= self.lock.acquire()
with Trylock._goplock:
caller.waits.remove( self )
caller.has.add( self )
return Acquires
def release( self ):
with self._oplock:
has= [ th for th in Trylock._threads.values() if self in
th.has ]
assert len( has )== 1
has[0].has.remove( self )
self.lock.release()
def __enter__( self ):
if not self.acquire():
raise Exception( 'Deadlock' )
def __exit__( self, t, v, tb ):
self.release()
def _cycles( self ):
inth= Trylock._threads[ thread.get_ident() ]
inlock= self
edges= [ Pair( th, ck ) for th in Trylock._threads.values()
for ck in th.has| th.waits ]
inpair= Pair( inth, inlock )
edges.append( inpair )
d= deque( [ e for e in edges if e.lock is inlock ] )
while d:
cur= d.popleft()
locks= [ e.lock for e in edges if e.thread is cur.thread
and e.lock is not cur.lock ]
for ck in locks:
nexts= [ e for e in edges if ck is e.lock and e.thread
is not cur.thread ]
if inpair in nexts: return True
d.extend( nexts )
return False
def main( func ):
if __name__== '__main__':
func()
@main
def fmain():
import random
locks= [ Trylock() for _ in range( 20 ) ]
def th1( i ):
while 1:
lock= random.choice( locks )
ret= lock.acquire()
if ret is not Acquires:
continue
print( '%i th lock %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2= random.choice( locks )
if lock2 is lock:
pass
elif lock2.acquire() is Acquires:
print( '%i th lock2 %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2.release()
lock.release()
print( '%i th lock %s release\n'% ( i, lock ) ),
ths= [ threading.Thread( target= th1, args= ( i, ) ) for i in
range( 6 ) ]
[ th.start() for th in ths ]
test cases, but one attempt to optimize it failed, so that may
indicate a bug. Will anyone help me debug it, verify it, or clean it?
It pertains to multi-threading and is a synchro. structure. If it is
not an interest of yours, please let it be someone else's. It's an
augmented Lock that I described a few weeks ago (different from the
FlowStep structure I was developing with Mr. Bieber).
My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock. Basic example:
thread-1 acquires lockA
thread-2 acquires lockB
thread-1 calls to acquire lockB and blocks
thread-2 calls to acquire lockA and blocks
and neither can ever continue. There is enough information to deny
thread-2's request, and return at once with failure. The method is
already somewhat treated in other structures, but I did not find one
for Python.
thread-2 calls to acquire lockA and returns Deadlock flag.
I would appreciate a half-hour and a thought or observation you make,
bugs you spot, &c. It lacks a blocking argument to acquire;
CustThread and Trylock should garbage collect themselves (note the set
of all); Trylock().lock should be an RLock; and I shouldn't need the
CustThread class at all, if anyone's going to publish this, but it can
be used for free right now.
It's runs in Python 3.0 and 2.5.
The current test consists of 6 threads randomly acquiring 20 locks and
infinitely looping. Thanks in advance.
from __future__ import with_statement
import threading
import thread
from time import sleep
from collections import deque
class CustThread:
count= 0
all= {}
def __init__( self, threadid ):
assert threadid not in CustThread.all
CustThread.count+= 1
self.id= CustThread.count
self.has= set()
self.waits= set()
def __repr__( self ):
return '<CustThread %i>'% self.id
def s( self ):
return self, 'has', self.has, 'waits', self.waits
try:
from collections import namedtuple
Pair= namedtuple( "Pair", "thread lock" )
except:
class Pair:
def __init__( self, thread, lock ):
self.thread, self.lock= thread, lock
Acquires, Blocks, Deadlocks= object(), object(), object()
resultdict= { Acquires: 'Acquires', Blocks: 'Blocks', Deadlocks:
'Deadlocks' }
class Trylock:
count= 0
locks= set()
_threads= {}
_goplock= threading.Lock()
def __init__( self, *ar, **kwar ):
#self.lock= threading.RLock()
self.lock= threading.Lock()
self._oplock= threading.Lock()
Trylock.locks.add( self )
self.id= Trylock.count
Trylock.count+= 1
def __repr__( self ):
return '<Trylock %i>'% self.id
def acquire( self ):
callerid= thread.get_ident()
with Trylock._goplock:
caller= Trylock._threads.get( callerid )
if caller is None:
caller= CustThread( callerid )
Trylock._threads[ callerid ]= caller
with Trylock._goplock:
if self._cycles():
return Deadlocks
caller.waits.add( self )
ret= self.lock.acquire()
with Trylock._goplock:
caller.waits.remove( self )
caller.has.add( self )
return Acquires
def release( self ):
with self._oplock:
has= [ th for th in Trylock._threads.values() if self in
th.has ]
assert len( has )== 1
has[0].has.remove( self )
self.lock.release()
def __enter__( self ):
if not self.acquire():
raise Exception( 'Deadlock' )
def __exit__( self, t, v, tb ):
self.release()
def _cycles( self ):
inth= Trylock._threads[ thread.get_ident() ]
inlock= self
edges= [ Pair( th, ck ) for th in Trylock._threads.values()
for ck in th.has| th.waits ]
inpair= Pair( inth, inlock )
edges.append( inpair )
d= deque( [ e for e in edges if e.lock is inlock ] )
while d:
cur= d.popleft()
locks= [ e.lock for e in edges if e.thread is cur.thread
and e.lock is not cur.lock ]
for ck in locks:
nexts= [ e for e in edges if ck is e.lock and e.thread
is not cur.thread ]
if inpair in nexts: return True
d.extend( nexts )
return False
def main( func ):
if __name__== '__main__':
func()
@main
def fmain():
import random
locks= [ Trylock() for _ in range( 20 ) ]
def th1( i ):
while 1:
lock= random.choice( locks )
ret= lock.acquire()
if ret is not Acquires:
continue
print( '%i th lock %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2= random.choice( locks )
if lock2 is lock:
pass
elif lock2.acquire() is Acquires:
print( '%i th lock2 %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2.release()
lock.release()
print( '%i th lock %s release\n'% ( i, lock ) ),
ths= [ threading.Thread( target= th1, args= ( i, ) ) for i in
range( 6 ) ]
[ th.start() for th in ths ]