T
Thomas Rachel
[posted in de.clp in German]
Hello,
I want to implement an alternative concept to worker threads processing
a job queue. The alternative consists of threads being the jobs
themselves and thus running only this one job. The job threads are
started after a Semaphore's acquire() "giving the OK" to do so. The
Semaphore's release() gets called inside the jobs/threads, saying
"done, the next one please".
But the program doesn't always stop on Ctrl-C.
So I had a look on threading.Semaphore and found
def acquire(self, blocking=1):
rc = False
self.__cond.acquire() [1]
[...]
self.__cond.release() [2]
return rc
__enter__ = acquire
def release(self):
self.__cond.acquire() [3]
[...]
self.__cond.release()
Due to Strg-C, after returning from [1] there was a KeyboardInterrupt.
[2] was never reached, so the threads deadlocked on [3].
In the other classes (Event, Condition) the "helper lock calls" are
wrapped with try...finally gepackt, where after Ctrl-C
self.__cond.release() would be called as well, allowing for the
currently running jobs to finish.
First I thought about writing a bug report. But does t make it better?
Because, with the finally: solution mentionned, an exception still can
happen between finally: and release() - or not? It is less probable, the
time window being smaller, but not impossible, so this error becomes
more subtle.
Thus, the following questions:
0. (First and in general) Am I doing "bad things" when calling release()
at a completely different place than acquire() with a Semaphore? I used
to believe that's why there are Semaphores?
1. Is a bug report useful, or would that worsen the problem, as the race
condition doesn't disappear completely?
2. Is in, in general, an error to work with Semaphores (ore, more
general with Locks) in the MainThread? Or should this be done in a
separate thread being informed about a keyboard exception by the main
thread?
Below is a minimal example (t.py) demonstrating the behaviour.
python -m t 1 does the IMHO bad thing: Ctrl-C stops the MainThread and
has the others starve on the lock.
python -m t 2 does spawning in a separate thread. The MainThread reacts
to Strg-C with run = False, telling the other threads to stop ASAP.
python -m t 3 improves Semaphore() by putting self.__cond.release() into
a finally: clause.
python -m t 4 "destroys" this beautiful new concept by throwing an
Asserion at the appropriate (worst case) place (instead of a
KeyboardInterrupt).
So the question is: Bug(report) doe to finally:, or used the wrong way?
TIA and greetings,
Thomas
import threading
import time
import random
run = True
destroyed = False
sema = threading.Semaphore(5)
def target():
print threading.current_thread(), ' works'
time.sleep(max(random.normalvariate(2, .4), 0))
sema.release()
class MySema(threading._Semaphore):
def acquire(self, blocking=1):
rc = False
ok = False # for destroying
try:
self._Semaphore__cond.acquire()
while self._Semaphore__value == 0:
if not blocking:
break
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s",
self, blocking, self._Semaphore__value)
self._Semaphore__cond.wait()
else:
self._Semaphore__value = self._Semaphore__value - 1
if __debug__:
self._note("%s.acquire: success, value=%s",
self, self._Semaphore__value)
rc = True
ok = True # for not destroying
finally:
if destroyed and not ok: assert 0, "hier kaputt, release()
doch nicht"
self._Semaphore__cond.release()
return rc
__enter__ = acquire
def release(self):
try:
self._Semaphore__cond.acquire()
self._Semaphore__value = self._Semaphore__value + 1
if __debug__:
self._note("%s.release: success, value=%s",
self, self._Semaphore__value)
self._Semaphore__cond.notify()
finally:
self._Semaphore__cond.release()
def thread():
t = threading.Thread(target=target)
t.start()
return t
def erroneous():
global run
try:
while True:
sema.acquire()
t = thread()
print 'main thread spawned', t
finally:
run = False
def extrathread():
global run
def extra():
while run:
sema.acquire()
t = thread()
print 'control thread spawned', t
threading.Thread(target=extra).start()
try:
while True:
time.sleep(.1)
finally:
run = False
def alternative():
global sema
sema = MySema(5)
erroneous()
if __name__ == '__main__':
import sys
if len(sys.argv) < 2 or sys.argv[1] == '1':
f = erroneous
elif sys.argv[1] == '2':
f = extrathread
elif sys.argv[1] == '3':
f = alternative
else:
destroyed = True
f = alternative
print f
f()
Hello,
I want to implement an alternative concept to worker threads processing
a job queue. The alternative consists of threads being the jobs
themselves and thus running only this one job. The job threads are
started after a Semaphore's acquire() "giving the OK" to do so. The
Semaphore's release() gets called inside the jobs/threads, saying
"done, the next one please".
But the program doesn't always stop on Ctrl-C.
So I had a look on threading.Semaphore and found
def acquire(self, blocking=1):
rc = False
self.__cond.acquire() [1]
[...]
self.__cond.release() [2]
return rc
__enter__ = acquire
def release(self):
self.__cond.acquire() [3]
[...]
self.__cond.release()
Due to Strg-C, after returning from [1] there was a KeyboardInterrupt.
[2] was never reached, so the threads deadlocked on [3].
In the other classes (Event, Condition) the "helper lock calls" are
wrapped with try...finally gepackt, where after Ctrl-C
self.__cond.release() would be called as well, allowing for the
currently running jobs to finish.
First I thought about writing a bug report. But does t make it better?
Because, with the finally: solution mentionned, an exception still can
happen between finally: and release() - or not? It is less probable, the
time window being smaller, but not impossible, so this error becomes
more subtle.
Thus, the following questions:
0. (First and in general) Am I doing "bad things" when calling release()
at a completely different place than acquire() with a Semaphore? I used
to believe that's why there are Semaphores?
1. Is a bug report useful, or would that worsen the problem, as the race
condition doesn't disappear completely?
2. Is in, in general, an error to work with Semaphores (ore, more
general with Locks) in the MainThread? Or should this be done in a
separate thread being informed about a keyboard exception by the main
thread?
Below is a minimal example (t.py) demonstrating the behaviour.
python -m t 1 does the IMHO bad thing: Ctrl-C stops the MainThread and
has the others starve on the lock.
python -m t 2 does spawning in a separate thread. The MainThread reacts
to Strg-C with run = False, telling the other threads to stop ASAP.
python -m t 3 improves Semaphore() by putting self.__cond.release() into
a finally: clause.
python -m t 4 "destroys" this beautiful new concept by throwing an
Asserion at the appropriate (worst case) place (instead of a
KeyboardInterrupt).
So the question is: Bug(report) doe to finally:, or used the wrong way?
TIA and greetings,
Thomas
import threading
import time
import random
run = True
destroyed = False
sema = threading.Semaphore(5)
def target():
print threading.current_thread(), ' works'
time.sleep(max(random.normalvariate(2, .4), 0))
sema.release()
class MySema(threading._Semaphore):
def acquire(self, blocking=1):
rc = False
ok = False # for destroying
try:
self._Semaphore__cond.acquire()
while self._Semaphore__value == 0:
if not blocking:
break
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s",
self, blocking, self._Semaphore__value)
self._Semaphore__cond.wait()
else:
self._Semaphore__value = self._Semaphore__value - 1
if __debug__:
self._note("%s.acquire: success, value=%s",
self, self._Semaphore__value)
rc = True
ok = True # for not destroying
finally:
if destroyed and not ok: assert 0, "hier kaputt, release()
doch nicht"
self._Semaphore__cond.release()
return rc
__enter__ = acquire
def release(self):
try:
self._Semaphore__cond.acquire()
self._Semaphore__value = self._Semaphore__value + 1
if __debug__:
self._note("%s.release: success, value=%s",
self, self._Semaphore__value)
self._Semaphore__cond.notify()
finally:
self._Semaphore__cond.release()
def thread():
t = threading.Thread(target=target)
t.start()
return t
def erroneous():
global run
try:
while True:
sema.acquire()
t = thread()
print 'main thread spawned', t
finally:
run = False
def extrathread():
global run
def extra():
while run:
sema.acquire()
t = thread()
print 'control thread spawned', t
threading.Thread(target=extra).start()
try:
while True:
time.sleep(.1)
finally:
run = False
def alternative():
global sema
sema = MySema(5)
erroneous()
if __name__ == '__main__':
import sys
if len(sys.argv) < 2 or sys.argv[1] == '1':
f = erroneous
elif sys.argv[1] == '2':
f = extrathread
elif sys.argv[1] == '3':
f = alternative
else:
destroyed = True
f = alternative
print f
f()