feedback requested

C

castironpi

I have a data structure I think would be very useful. It passes a few
test cases, but one attempt to optimize it failed, so that may
indicate a bug. Will anyone help me debug it, verify it, or clean it?

It pertains to multi-threading and is a synchro. structure. If it is
not an interest of yours, please let it be someone else's. It's an
augmented Lock that I described a few weeks ago (different from the
FlowStep structure I was developing with Mr. Bieber).

My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock. Basic example:

thread-1 acquires lockA
thread-2 acquires lockB
thread-1 calls to acquire lockB and blocks
thread-2 calls to acquire lockA and blocks

and neither can ever continue. There is enough information to deny
thread-2's request, and return at once with failure. The method is
already somewhat treated in other structures, but I did not find one
for Python.

thread-2 calls to acquire lockA and returns Deadlock flag.

I would appreciate a half-hour and a thought or observation you make,
bugs you spot, &c. It lacks a blocking argument to acquire;
CustThread and Trylock should garbage collect themselves (note the set
of all); Trylock().lock should be an RLock; and I shouldn't need the
CustThread class at all, if anyone's going to publish this, but it can
be used for free right now.

It's runs in Python 3.0 and 2.5.

The current test consists of 6 threads randomly acquiring 20 locks and
infinitely looping. Thanks in advance.

from __future__ import with_statement
import threading
import thread
from time import sleep
from collections import deque

class CustThread:
count= 0
all= {}
def __init__( self, threadid ):
assert threadid not in CustThread.all
CustThread.count+= 1
self.id= CustThread.count
self.has= set()
self.waits= set()
def __repr__( self ):
return '<CustThread %i>'% self.id
def s( self ):
return self, 'has', self.has, 'waits', self.waits

try:
from collections import namedtuple
Pair= namedtuple( "Pair", "thread lock" )
except:
class Pair:
def __init__( self, thread, lock ):
self.thread, self.lock= thread, lock

Acquires, Blocks, Deadlocks= object(), object(), object()
resultdict= { Acquires: 'Acquires', Blocks: 'Blocks', Deadlocks:
'Deadlocks' }

class Trylock:
count= 0
locks= set()
_threads= {}
_goplock= threading.Lock()
def __init__( self, *ar, **kwar ):
#self.lock= threading.RLock()
self.lock= threading.Lock()
self._oplock= threading.Lock()
Trylock.locks.add( self )
self.id= Trylock.count
Trylock.count+= 1
def __repr__( self ):
return '<Trylock %i>'% self.id
def acquire( self ):
callerid= thread.get_ident()
with Trylock._goplock:
caller= Trylock._threads.get( callerid )
if caller is None:
caller= CustThread( callerid )
Trylock._threads[ callerid ]= caller
with Trylock._goplock:
if self._cycles():
return Deadlocks
caller.waits.add( self )
ret= self.lock.acquire()
with Trylock._goplock:
caller.waits.remove( self )
caller.has.add( self )
return Acquires
def release( self ):
with self._oplock:
has= [ th for th in Trylock._threads.values() if self in
th.has ]
assert len( has )== 1
has[0].has.remove( self )
self.lock.release()
def __enter__( self ):
if not self.acquire():
raise Exception( 'Deadlock' )
def __exit__( self, t, v, tb ):
self.release()
def _cycles( self ):
inth= Trylock._threads[ thread.get_ident() ]
inlock= self
edges= [ Pair( th, ck ) for th in Trylock._threads.values()
for ck in th.has| th.waits ]
inpair= Pair( inth, inlock )
edges.append( inpair )

d= deque( [ e for e in edges if e.lock is inlock ] )
while d:
cur= d.popleft()
locks= [ e.lock for e in edges if e.thread is cur.thread
and e.lock is not cur.lock ]
for ck in locks:
nexts= [ e for e in edges if ck is e.lock and e.thread
is not cur.thread ]
if inpair in nexts: return True
d.extend( nexts )
return False

def main( func ):
if __name__== '__main__':
func()

@main
def fmain():
import random
locks= [ Trylock() for _ in range( 20 ) ]
def th1( i ):
while 1:
lock= random.choice( locks )
ret= lock.acquire()
if ret is not Acquires:
continue
print( '%i th lock %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2= random.choice( locks )
if lock2 is lock:
pass
elif lock2.acquire() is Acquires:
print( '%i th lock2 %s acquire\n'% ( i, lock ) ),
sleep( .0001 )
lock2.release()
lock.release()
print( '%i th lock %s release\n'% ( i, lock ) ),
ths= [ threading.Thread( target= th1, args= ( i, ) ) for i in
range( 6 ) ]
[ th.start() for th in ths ]
 
D

Dennis Lee Bieber

My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock. Basic example:
If you succeed with a true general solution, you may want to write
it up for some scientific journal... I don't think Hoare, Dijkstra, or
Knuth have any guaranteed means of deadlock avoidance.

Note that, if you have to traverse lists of "entities blocked and
holding" you will likely need to add locks to those lists themselves...
and run the risk of deadlocking somewhere in that, or you take so much
time in non-deadlock situations that it isn't worth the safeguard.

I am NOT going to look any further into this line...
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
C

castironpi

My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock. Basic example:
[ The safeguard is never worth the cost.]
I am NOT going to look any further into this line...

I am.

Can anyone tell me if this deadlocks? Sample test block included,
feel free to shred. Does not currently permit reentrance.
Simplification of above, denies reentrant calls, eliminates ThreadDesc
class. Getting a /beep/ on threaded printing in 3.0, none in 2.5.
Got a case too, where 2444 == 2444 but 2444 is not 2444. Of course
they aren't guaranteed to, but I am testing for identity in the return
from thread.get_ident(). "The 'is' operator compares the identity of
two objects." "[After] a = 1; b = 1, a and b may or may not refer to
the same object." Humph. Anyway, without further ado, direct to you
at no extra cost, thin and narrow for your cut-and-pasting
convenience, drumroll please.
-=-=-=-=-=-=-=-
from __future__ import with_statement
import threading
import thread
from time import sleep
from collections import deque

try:
from collections import namedtuple
ThreadLockPair= namedtuple( "ThreadLockPair",
"thread lock" )
except:
class ThreadLockPair:
def __init__( self, thread, lock ):
self.thread, self.lock= thread, lock

Acquires, Deadlocks, Timesout= \
object(), object(), object()
resultdict= { Acquires: 'Acquires',
Deadlocks: 'Deadlocks', Timesout: 'Timesout' }

class Trylock:
_count= 0
_alllocks= set()
_goplock= threading.Lock()
def __init__( self, *ar, **kwar ):
self._lock= threading.Lock()
self._owner= None
self._waiters= set()
Trylock._alllocks.add( self )
self.id= Trylock._count
Trylock._count+= 1
def __repr__( self ):
return '<Trylock %i>'% self.id
def acquire( self ):
caller= thread.get_ident()
with Trylock._goplock:
if caller in self._waiters:
return Deadlocks
if self._cycles():
return Deadlocks
self._waiters.add( caller )
assert self._lock.acquire()
with Trylock._goplock:
assert self._owner is None
self._owner= caller
return Acquires
def release( self ):
with Trylock._goplock:
self._waiters.remove( self._owner )
self._owner= None
self._lock.release()
def __enter__( self ):
if self.acquire() is Deadlock:
raise Exception( 'Deadlock' )
def __exit__( self, t, v, tb ):
self.release()
def _cycles( self, thd= None ):
lck= self
if thd is None:
thd= thread.get_ident()
edges= [ ThreadLockPair( th, ck )
for ck in Trylock._alllocks
for th in ck._waiters ]
inpair= ThreadLockPair( thd, lck )
edges.append( inpair )

d= deque( [ e for e in edges
if e.lock is lck ] )
while d:
cur= d.popleft()
locks= [ e.lock for e in edges
if e.thread== cur.thread and
e.lock is not cur.lock ]
for ck in locks:
nexts= [ e for e in edges
if ck is e.lock and
e.thread!= cur.thread ]
if inpair in nexts: return True
d.extend( nexts )
return False

def main( func ):
if __name__== '__main__':
func()

@main
def fmain():
import random
locks= [ Trylock() for _ in range( 20 ) ]
def th1( i ):
while 1:
lock= random.choice( locks )
ret= lock.acquire()
if ret is not Acquires:
continue
print( '%i th lock %s acquire\n'%
( i, lock ) ),
sleep( .0001 )
lock2= random.choice( locks )
if lock2.acquire() is Acquires:
print( '%i th lock2 %s acquire\n'%
( i, lock ) ),
sleep( .0001 )
lock2.release()
lock.release()
print( '%i th lock %s release\n'%
( i, lock ) ),
ths= [ threading.Thread( target= th1,
args= ( i, ) ) for i in range( 6 ) ]
[ th.start() for th in ths ]
-=-=-=-=-=-=-
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,994
Messages
2,570,223
Members
46,810
Latest member
Kassie0918

Latest Threads

Top