Source code for mbsim.core.tasks

"""
====
Task
====

A module to manage classes of Task objects.

Task are a way to run a function or coroutinefunction periodically.  This is useful for things like updating a values
or sending a message to a client every so often.

.. code::

    @mbsim.core.server.Task(1, args=("Hello", "world"))
    def hw(*args):
        print(*args)

    def main():
        task = mbsim.core.server.Task(1, func=hw, args=("foo", "bar"))
        mb.start()


This will print "Hello" and "world" every second. I also prints "foo" and "bar" every second.
"""
import asyncio
import inspect
import logging
import warnings
from functools import partial

log = logging.getLogger(__name__)


[docs]def getloop(): """ Function to return async loop """ try: return asyncio.get_running_loop() except RuntimeError: return asyncio.new_event_loop()
[docs]class Task(object): """ A Class to manage all the task to run periodically alongside the server/client This can be called as a decorators. The following example will print "Hello\\nworld" every second. The decorator can be used multiple times to call more than once. .. code:: @mbsim.core.server.Task(1, args=("Hello", "world")) def hw(*args): for arg in args: print(arg) or mbsim.core.server.Task(1, func=hw, args=("Hello", "world")) Both example are equivalent """ tasks = [] loop = None _clean = None def __init__(self, inter, func=None, args=(), kwargs=None, now=False, block=False): """ This will take the arguments for the task to be run :param inter: Interval of time to wait before calling the function or coroutinefunction in seconds :type inter: float :param func: Function to call :type func: function, coroutinefunction, optional :param args: arguments to be past to task :type args: tuple, optional :param kwargs: key word arguments to be passed to task, defaults to {} :type kwargs: dict, optional :param now: To run immediately or wait the interval, defaults to False :type now: bool, optional """ if kwargs is None: kwargs = {} self.task = None self.inter = inter self.args = args self.kwargs = kwargs self.now = now self.tasks.append(self) self.block = block if func: self(func) warnings.warn("The Block argument will be removed in v1.1.0", DeprecationWarning) def __call__(self, func): """ Init the task and return original function :param func: A Non blocking function :type func: function :return: Original function :rtype: function """ self.func = func return func
[docs] async def wrap(self, a, kw): """ A wrap for twisted to pass a and kw as args. This function will explode them into the function :param a: args :type a: tuple :param kw: kwargs :type kw: dict """ try: if inspect.iscoroutinefunction(self.func): if self.block: raise ValueError("{} cannot be blocking if is coroutine".format(self.func.__name__)) if self.now: await self.func(*a, **kw) while True: await asyncio.sleep(self.inter) await self.func(*a, **kw) if self.now: await self.loop.run_in_executor(None, partial(self.func, *a, **kw)) while True: await asyncio.sleep(self.inter) await self.loop.run_in_executor(None, partial(self.func, *a, **kw)) except asyncio.CancelledError: log.debug("Task %s canceled", (self.func.__name__, self.inter, self.args, self.kwargs))
[docs] def start(self): """ To start the task """ log.debug("Starting task: %s", (self.func.__name__, self.inter, self.args, self.kwargs)) self.task = self.loop.create_task(self.wrap(self.args, self.kwargs))
[docs] @classmethod def startTasks(cls, loop=None): """ Class method to start all tasks """ if cls.loop and loop: raise RuntimeError("Cannot replace loop in Tasks") elif cls.loop is None: if loop is not None: cls.loop = loop else: cls.loop = getloop() cls._clean = cls.loop.create_task(_cleanup(cls.loop)) for task in cls.tasks: task.start()
[docs] def stop(self): """ To stop a task """ log.debug("Stopping task: %s", (self.func.__name__, self.inter, self.args, self.kwargs)) self.task.cancel()
[docs] @classmethod def stopTasks(cls): """ Class method to stop all tasks """ for task in cls.tasks: task.stop() cls._clean.cancel()
async def _cleanup(loop): """ Coroutine to allow all async tasks to clean up :param loop: Event loop """ try: while True: await asyncio.sleep(0.05) except asyncio.CancelledError: await asyncio.sleep(1) loop.stop()