plugins.aea-test-autonomy.aea_
test_
autonomy.helpers.async_
utils
Helpers for Pytest tests with asynchronous programming.
wait_
for_
condition
def wait_for_condition(condition_checker: Callable,
timeout: int = 2,
error_msg: str = "Timeout",
period: float = 0.001) -> None
Wait for condition occures in selected timeout.
AnotherThreadTask Objects
class AnotherThreadTask()
Schedule a task to run on the loop in another thread.
Provides better cancel behaviour: on cancel it will wait till cancelled completely.
__
init__
def __init__(coro: Union[Coroutine[Any, Any, Any], Generator[Any, None, Any]],
loop: AbstractEventLoop) -> None
Init the task.
Arguments:
coro
: coroutine to scheduleloop
: an event loop to schedule on.
result
def result(timeout: Optional[float] = None) -> Any
Wait for coroutine execution result.
Arguments:
timeout
: optional timeout to wait in seconds.
Returns:
result
cancel
def cancel() -> None
Cancel coroutine task execution in a target loop.
done
def done() -> bool
Check task is done.
ThreadedAsyncRunner Objects
class ThreadedAsyncRunner(Thread)
Util to run thread with event loop and execute coroutines inside.
__
init__
def __init__(loop: Optional[AbstractEventLoop] = None) -> None
Init threaded runner.
Arguments:
loop
: optional event loop. is it's running loop, threaded runner will use it.
start
def start() -> None
Start event loop in dedicated thread.
run
def run() -> None
Run code inside thread.
call
def call(
coro: Union[Coroutine[Any, Any, Any], Generator[Any, None,
Any]]) -> Any
Run a coroutine inside the event loop.
Arguments:
coro
: a coroutine to run.
Returns:
task
stop
def stop() -> None
Stop event loop in thread.
BaseThreadedAsyncLoop Objects
class BaseThreadedAsyncLoop()
Test class with a threaded event loop running.
setup
def setup() -> None
Set up the class.
execute
def execute(coro: Union[Coroutine[Any, Any, Any], Generator[Any, None, Any]],
timeout: float = DEFAULT_ASYNC_TIMEOUT) -> Any
Execute a coroutine and wait its completion.
teardown
def teardown() -> None
Teardown the class.