Skip to content

plugins.aea-test-autonomy.aea_test_autonomy.helpers.async_utils

Helpers for Pytest tests with asynchronous programming.

wait_for_condition

def wait_for_condition(condition_checker: Callable,
                       timeout: int = 2,
                       error_msg: str = "Timeout",
                       period: float = 0.001) -> None

Wait for condition occures in selected timeout.

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

def __init__(coro: Union[Coroutine[Any, Any, Any], Generator[Any, None, Any]],
             loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

def result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

Returns:

result

cancel

def cancel() -> None

Cancel coroutine task execution in a target loop.

done

def done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

def __init__(loop: Optional[AbstractEventLoop] = None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

def start() -> None

Start event loop in dedicated thread.

run

def run() -> None

Run code inside thread.

call

def call(
        coro: Union[Coroutine[Any, Any, Any], Generator[Any, None,
                                                        Any]]) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

Returns:

task

stop

def stop() -> None

Stop event loop in thread.

BaseThreadedAsyncLoop Objects

class BaseThreadedAsyncLoop()

Test class with a threaded event loop running.

setup

def setup() -> None

Set up the class.

execute

def execute(coro: Union[Coroutine[Any, Any, Any], Generator[Any, None, Any]],
            timeout: float = DEFAULT_ASYNC_TIMEOUT) -> Any

Execute a coroutine and wait its completion.

teardown

def teardown() -> None

Teardown the class.