Source code for aioshell.executor

"""This module provides only the Executor class."""
import asyncio
from .runnable import Runnable


[docs]class Executor: """Less boilerplate to run asynchronous tasks. **Basic usage**: call :func:`add` once for every :class:`Runnable` object (:class:`Shell`, :class:`SSH`) and, in the end, call :func:`finish` to wait for them to finish and clean resources. Example:: from aioshell import Executor, Shell exe = Executor() exe.add(Shell('date >/tmp/aioshell; sleep 1')) exe.add(Shell('sleep 1; date >>/tmp/aioshell')) exe.finish() In the */tmp/test* file, you'll notice that it took only 1 second instead of 2. """ _loop = None def __init__(self): """Get the main event loop.""" self.futures = [] """(*Advanced usage*) accumulated results of :func:`add`. Useful for further information about executions. If :func:`add` is called with a :class:`Runnable` or :ref:`coroutine <coroutine>` object, a correpondent Task will be appended to this list. If a :class:`Future <asyncio.Future>` is added, the Future itself will be appended. :type: list of :class:`Task <asyncio.Task>` or :class:`Future <asyncio.Future>` """ self.loop = Executor._get_event_loop() """(*Advanced usage*) event loop. This event loop is shared between all objects until it is closed. If the loop is closed, automatically create a new one (it will also be shared between new objects). :type: :class:`BaseEventLoop <asyncio.BaseEventLoop>` """ @classmethod def _get_event_loop(cls): """If the asyncio loop is closed, create a new one and set it.""" if cls._loop is None: cls._loop = asyncio.get_event_loop() if cls._loop.is_closed(): cls._loop = asyncio.new_event_loop() asyncio.set_event_loop(cls._loop) return cls._loop
[docs] def add(self, runnable_coro_future): """Run a Runnable (basic usage), coroutine or Future. *Note for advanced users:* each call will append a correspondent :class:`Task <asyncio.Task>` or :class:`Future <asyncio.Future>` to :attr:`futures` in case you want more information about the execution than provided by this class. :param runnable_coro_future: action to be performed asynchronously. :type runnable_coro_future: :class:`Runnable` (e.g. :class:`Shell`, :class:`SSH`), :ref:`coroutine <coroutine>` or :class:`Future <asyncio.Future>` objects """ coro_future = _get_coro_future(runnable_coro_future) task = asyncio.ensure_future(coro_future) self.futures.append(task) return task
[docs] def run_wait(self, runnable_coro_future): """Block until *runnable_coro_future* is finished. It will wait for only this `runnable_coro_future`. Useful if you want the traditional behaviour of sequential programming for some reason. Otherwise, use the concurrent and faster version :func:`add`. :param runnable_coro_future: action to be performed synchronously. :type runnable_coro_future: :class:`Runnable` (e.g. :class:`Shell`, :class:`SSH`), :ref:`coroutine <coroutine>` or :class:`Future <asyncio.Future>` objects """ coro_future = _get_coro_future(runnable_coro_future) return self.loop.run_until_complete(coro_future)
[docs] def wait(self): """Block until all added tasks by :func:`add` are done. You can add more tasks later. When you are finished, call :func:`finish`. """ if self.futures: self.loop.run_until_complete(asyncio.wait(self.futures))
[docs] def finish(self): """Wait for all tasks, close the event loop and clean resources. You should call this method in the end of your program. It performs 3 actions: #. Wait for all added tasks (by :func:`add`) to be finished (like :func:`wait` does); #. Clear :attr:`futures` list; #. Close the main event loop: * Allow clean exit, without warnings; * If you need to run anything else, use a new object of this class. """ self.wait() self.loop.close() self.futures.clear()
def _get_coro_future(runnable_coro_future): """If it is a Runnable object, call run() to get the coroutine.""" if isinstance(runnable_coro_future, Runnable): coro_future = runnable_coro_future.run() else: coro_future = runnable_coro_future return coro_future