Q
qvx
I need a scheduler which can delay execution of a
function for certain period of time.
My attempt was something like this:
def delay(self, func, arg, delay_sec=0):
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg))
def runner(self):
while self.alive:
fire_at, func, arg = self.queue.get(block=True)
try:
now = wallclock()
if now < fire_at:
time.sleep(fire_at - now)
func(arg)
except Exception, e:
log('DelayedTaskManager %s: %s\n' % (self.name, e))
finally:
self.queue.task_done()
But then I came up with the following case:
1. I call delay with delay_sec = 10
2. The scheduler goes to sleep for 10 seconds
3. In the meantime (lets say 1 second later) I delay
another func but this time with delay_sec=0.5
4. The scheduler is sleeping and won't know call my
second function for another 9 seconds insted of 0.5
I started googling for scheduler and found one in standard library
but ih has the same code as mine (it calls the functions in the
right order and my doesn't, but it still waits too long).
The other schedulers from web are dealing with
repeating tasks and such.
So, I wrote this:
# modification of http://code.activestate.com/recipes/87369/
class PriorityMinQueue(Queue):
def top(self):
try:
return self.queue[0]
except IndexError:
return None
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = []
def _put(self, item):
return heappush(self.queue, item)
def _get(self):
return heappop(self.queue)
class DelayedTaskManager:
def __init__(self, name):
self.name = name
self.queue = PriorityMinQueue()
# can't use queue.not_empty condition because it isn't
# signaled with notifyAll so I have to use my own
self.sleeper = threading.Condition()
def start(self):
log('start delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = True
self.thread = threading.Thread(target=self.runner)
self.thread.setDaemon(True)
self.thread.start()
def stop(self):
log('stop delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = False
self._wake()
self.thread.join()
def delay(self, delay_sec, func, *arg, **kw):
# even if delay is 0 or less, put to queue
# so the function gets executed concurrently
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg, kw))
self._wake()
def _wake(self):
with self.sleeper:
self.sleeper.notify()
def _wait(self, timeout):
with self.sleeper:
self.sleeper.wait(timeout)
def runner(self):
while self.alive:
fire_at, func, arg, kw = self.queue.get(block=True)
try:
now = wallclock()
while now < fire_at:
self._wait(fire_at - now)
if not self.alive: # canceled
log('delayed task manager %s was stoped\n',
self.name)
return self.queue.put((fire_at, func, arg,
kw))
top = self.queue.top()
if top is not None and top[0] < fire_at:
# temporally closer item, put back the old one
self.queue.put((fire_at, func, arg, kw))
self.queue.task_done()
fire_at, func, arg, kw = self.queue.get()
now = wallclock()
func(*arg, **kw)
except Exception, e:
log('delayed task manager %s: %s\n', self.name, e)
finally:
self.queue.task_done()
Is there a better way or some library that does that?
My observations:
1. Threading module uses time.sleep instead of time.clock
which results in less precise results (on windows platform)
if sys.platform=="win32": #take care of differences in clock
accuracy
wallclock = time.clock
else:
wallclock = time.time
2. while analyzing threading module i noticed that wait() is
implemented via loop and tiny sleep periods. I was expecting
the usage of underlaying OS primitives and functions but
then I remembered about GIL and quasi-multithreaded nature
of Python. But still, isn't there a more precise method
that interpreter itself could implement?
Thanks,
Tvrtko
P.S. This was Python 2.5
function for certain period of time.
My attempt was something like this:
def delay(self, func, arg, delay_sec=0):
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg))
def runner(self):
while self.alive:
fire_at, func, arg = self.queue.get(block=True)
try:
now = wallclock()
if now < fire_at:
time.sleep(fire_at - now)
func(arg)
except Exception, e:
log('DelayedTaskManager %s: %s\n' % (self.name, e))
finally:
self.queue.task_done()
But then I came up with the following case:
1. I call delay with delay_sec = 10
2. The scheduler goes to sleep for 10 seconds
3. In the meantime (lets say 1 second later) I delay
another func but this time with delay_sec=0.5
4. The scheduler is sleeping and won't know call my
second function for another 9 seconds insted of 0.5
I started googling for scheduler and found one in standard library
but ih has the same code as mine (it calls the functions in the
right order and my doesn't, but it still waits too long).
The other schedulers from web are dealing with
repeating tasks and such.
So, I wrote this:
# modification of http://code.activestate.com/recipes/87369/
class PriorityMinQueue(Queue):
def top(self):
try:
return self.queue[0]
except IndexError:
return None
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = []
def _put(self, item):
return heappush(self.queue, item)
def _get(self):
return heappop(self.queue)
class DelayedTaskManager:
def __init__(self, name):
self.name = name
self.queue = PriorityMinQueue()
# can't use queue.not_empty condition because it isn't
# signaled with notifyAll so I have to use my own
self.sleeper = threading.Condition()
def start(self):
log('start delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = True
self.thread = threading.Thread(target=self.runner)
self.thread.setDaemon(True)
self.thread.start()
def stop(self):
log('stop delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = False
self._wake()
self.thread.join()
def delay(self, delay_sec, func, *arg, **kw):
# even if delay is 0 or less, put to queue
# so the function gets executed concurrently
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg, kw))
self._wake()
def _wake(self):
with self.sleeper:
self.sleeper.notify()
def _wait(self, timeout):
with self.sleeper:
self.sleeper.wait(timeout)
def runner(self):
while self.alive:
fire_at, func, arg, kw = self.queue.get(block=True)
try:
now = wallclock()
while now < fire_at:
self._wait(fire_at - now)
if not self.alive: # canceled
log('delayed task manager %s was stoped\n',
self.name)
return self.queue.put((fire_at, func, arg,
kw))
top = self.queue.top()
if top is not None and top[0] < fire_at:
# temporally closer item, put back the old one
self.queue.put((fire_at, func, arg, kw))
self.queue.task_done()
fire_at, func, arg, kw = self.queue.get()
now = wallclock()
func(*arg, **kw)
except Exception, e:
log('delayed task manager %s: %s\n', self.name, e)
finally:
self.queue.task_done()
Is there a better way or some library that does that?
My observations:
1. Threading module uses time.sleep instead of time.clock
which results in less precise results (on windows platform)
if sys.platform=="win32": #take care of differences in clock
accuracy
wallclock = time.clock
else:
wallclock = time.time
2. while analyzing threading module i noticed that wait() is
implemented via loop and tiny sleep periods. I was expecting
the usage of underlaying OS primitives and functions but
then I remembered about GIL and quasi-multithreaded nature
of Python. But still, isn't there a more precise method
that interpreter itself could implement?
Thanks,
Tvrtko
P.S. This was Python 2.5