G
Giampaolo Rodola'
Hi,
I'm trying to implement an asynchronous scheduler for asyncore to call
functions at a later time without blocking the main loop.
The logic behind it consists in:
- adding the scheduled functions into a heapified list
- calling a "scheduler" function at every loop which checks the
scheduled functions due to expire soonest
Note that, by using a heap, the first element of the list is always
supposed to be the one with the lower timeout.
Here's the code I wrote:
<--- snippet --->
import heapq
import time
import sys
delayed_map = []
class delayed_call:
"""Calls a function at a later time.
The instance returned is an object that can be used to cancel the
scheduled call, by calling its cancel() method.
It also may be rescheduled by calling delay() or reset()} methods.
"""
def __init__(self, delay, target, *args, **kwargs):
"""
- delay: the number of seconds to wait
- target: the callable object to call later
- args: the arguments to call it with
- kwargs: the keyword arguments to call it with
"""
assert callable(target), "%s is not callable" %target
assert sys.maxint >= delay >= 0, "%s is not greater than or
equal " \
"to 0 seconds" % (delay)
self.__delay = delay
self.__target = target
self.__args = args
self.__kwargs = kwargs
# seconds from the epoch at which to call the function
self.timeout = time.time() + self.__delay
self.cancelled = False
heapq.heappush(delayed_map, self)
def __le__(self, other):
return self.timeout <= other.timeout
def active(self):
"""Return True if this scheduler has not been cancelled."""
return not self.cancelled
def call(self):
"""Call this scheduled function."""
self.__target(*self.__args, **self.__kwargs)
def reset(self):
"""Reschedule this call resetting the current countdown."""
assert not self.cancelled, "Already cancelled"
self.timeout = time.time() + self.__delay
if delayed_map[0] is self:
heapq.heapify(delayed_map)
def delay(self, seconds):
"""Reschedule this call for a later time."""
assert not self.cancelled, "Already cancelled."
assert sys.maxint >= seconds >= 0, "%s is not greater than or
equal " \
"to 0 seconds" %(seconds)
self.__delay = seconds
self.reset()
def cancel(self):
"""Unschedule this call."""
assert not self.cancelled, "Already cancelled"
del self.__target, self.__args, self.__kwargs
if self in delayed_map:
if delayed_map[0] is self:
delayed_map.remove(self)
heapq.heapify(delayed_map)
else:
delayed_map.remove(self)
self.cancelled = True
def fun(arg):
print arg
a = delayed_call(0.6, fun, '0.6')
b = delayed_call(0.5, fun, '0.5')
c = delayed_call(0.4, fun, '0.4')
d = delayed_call(0.3, fun, '0.3')
e = delayed_call(0.2, fun, '0.2')
f = delayed_call(0.1, fun, '0.1')
while delayed_map:
now = time.time()
while delayed_map and now >= delayed_map[0].timeout:
delayed = heapq.heappop(delayed_map)
try:
delayed.call()
finally:
if not delayed.cancelled:
delayed.cancel()
time.sleep(0.01)
</--- snippet --->
Here comes the questions.
Since that the timeouts of the scheduled functions contained in the
list can change when I reset() or cancel() them I don't know exactly
*when* the list needs to be heapified().
By doing some tests I came to the conclusion that I need the heapify()
the list only when the function I reset() or cancel() is the *first of
the list* but I'm not absolutely sure about it.
When do you think it would be necessary calling heapify()?
I wrote a short test suite which tests the code above and I didn't
notice strange behaviors but since that I don't know much about the
logic behind heaps I'd need some help.
Thanks a lot in advance.
--- Giampaolo
http://code.google.com/p/pyftpdlib/
I'm trying to implement an asynchronous scheduler for asyncore to call
functions at a later time without blocking the main loop.
The logic behind it consists in:
- adding the scheduled functions into a heapified list
- calling a "scheduler" function at every loop which checks the
scheduled functions due to expire soonest
Note that, by using a heap, the first element of the list is always
supposed to be the one with the lower timeout.
Here's the code I wrote:
<--- snippet --->
import heapq
import time
import sys
delayed_map = []
class delayed_call:
"""Calls a function at a later time.
The instance returned is an object that can be used to cancel the
scheduled call, by calling its cancel() method.
It also may be rescheduled by calling delay() or reset()} methods.
"""
def __init__(self, delay, target, *args, **kwargs):
"""
- delay: the number of seconds to wait
- target: the callable object to call later
- args: the arguments to call it with
- kwargs: the keyword arguments to call it with
"""
assert callable(target), "%s is not callable" %target
assert sys.maxint >= delay >= 0, "%s is not greater than or
equal " \
"to 0 seconds" % (delay)
self.__delay = delay
self.__target = target
self.__args = args
self.__kwargs = kwargs
# seconds from the epoch at which to call the function
self.timeout = time.time() + self.__delay
self.cancelled = False
heapq.heappush(delayed_map, self)
def __le__(self, other):
return self.timeout <= other.timeout
def active(self):
"""Return True if this scheduler has not been cancelled."""
return not self.cancelled
def call(self):
"""Call this scheduled function."""
self.__target(*self.__args, **self.__kwargs)
def reset(self):
"""Reschedule this call resetting the current countdown."""
assert not self.cancelled, "Already cancelled"
self.timeout = time.time() + self.__delay
if delayed_map[0] is self:
heapq.heapify(delayed_map)
def delay(self, seconds):
"""Reschedule this call for a later time."""
assert not self.cancelled, "Already cancelled."
assert sys.maxint >= seconds >= 0, "%s is not greater than or
equal " \
"to 0 seconds" %(seconds)
self.__delay = seconds
self.reset()
def cancel(self):
"""Unschedule this call."""
assert not self.cancelled, "Already cancelled"
del self.__target, self.__args, self.__kwargs
if self in delayed_map:
if delayed_map[0] is self:
delayed_map.remove(self)
heapq.heapify(delayed_map)
else:
delayed_map.remove(self)
self.cancelled = True
def fun(arg):
print arg
a = delayed_call(0.6, fun, '0.6')
b = delayed_call(0.5, fun, '0.5')
c = delayed_call(0.4, fun, '0.4')
d = delayed_call(0.3, fun, '0.3')
e = delayed_call(0.2, fun, '0.2')
f = delayed_call(0.1, fun, '0.1')
while delayed_map:
now = time.time()
while delayed_map and now >= delayed_map[0].timeout:
delayed = heapq.heappop(delayed_map)
try:
delayed.call()
finally:
if not delayed.cancelled:
delayed.cancel()
time.sleep(0.01)
</--- snippet --->
Here comes the questions.
Since that the timeouts of the scheduled functions contained in the
list can change when I reset() or cancel() them I don't know exactly
*when* the list needs to be heapified().
By doing some tests I came to the conclusion that I need the heapify()
the list only when the function I reset() or cancel() is the *first of
the list* but I'm not absolutely sure about it.
When do you think it would be necessary calling heapify()?
I wrote a short test suite which tests the code above and I didn't
notice strange behaviors but since that I don't know much about the
logic behind heaps I'd need some help.
Thanks a lot in advance.
--- Giampaolo
http://code.google.com/p/pyftpdlib/