from twisted.internet.task import LoopingCall
class Task(object):
ticker = None
def stop(self):
self.ticker.remove(self)
class LoopTask(Task):
def __init__(self, ticker, interval, callback):
self.ticker = ticker
self.interval = interval
self.callback = callback
def update(self):
if self.ticker.tick % self.interval == 0:
self.callback()
class DelayTask(Task):
def __init__(self, ticker, delay, callback):
self.ticker = ticker
self.delay = delay
self.callback = callback
self.restart()
def restart(self):
self.target = self.ticker.tick + self.delay
def update(self):
if self.ticker.tick >= self.target:
self.callback()
self.stop()
[docs]class Ticker(object):
#: The current tick
tick = 0
#: Interval between ticks, in seconds
interval = 1.0/20
#: Maximum number of delayed ticks before they're all skipped
max_lag = 40
running = False
def __init__(self, logger):
self._logger = logger
self._tasks = []
self._impl = LoopingCall.withCount(self._update)
[docs] def start(self):
"""
Start running the tick loop.
"""
if not self.running:
self._impl.start(self.interval, now=False)
self.running = True
[docs] def stop(self):
"""
Stop running the tick loop.
"""
if self.running:
self._impl.stop()
self.running = False
[docs] def add_loop(self, interval, callback):
"""
Repeatedly run a callback.
:param interval: The interval in ticks
:param callback: The callback to run
:return: An instance providing a ``stop()`` method
"""
task = LoopTask(self, interval, self._wrap(callback))
self._tasks.append(task)
return task
[docs] def add_delay(self, delay, callback):
"""
Run a callback after a delay.
:param delay: The delay in ticks
:param callback: The callback to run
:return: An instance providing ``stop()`` and ``restart()`` methods
"""
task = DelayTask(self, delay, self._wrap(callback))
self._tasks.append(task)
return task
[docs] def remove(self, task):
"""
Removes a task, effectively cancelling it.
:param task: The task to remove
"""
self._tasks.remove(task)
[docs] def remove_all(self):
"""
Removes all registered tasks, effectively cancelling them.
"""
del self._tasks[:]
def _update(self, count):
if count >= self.max_lag:
self._logger.warn("Can't keep up! Skipping %d ticks" % (count - 1))
count = 1
for _ in range(count):
for task in list(self._tasks):
task.update()
self.tick += 1
def _wrap(self, callback):
def fn():
try:
callback()
except Exception as e:
self._logger.exception(e)
return fn