Skip to content

packages.valory.skills.abstract_round_abci.test_tools.integration

Integration tests for various transaction settlement skill's failure modes.

IntegrationBaseCase Objects

class IntegrationBaseCase(FSMBehaviourBaseCase)

Base test class for integration tests.

setup_class

@classmethod
def setup_class(cls, **kwargs: Any) -> None

Setup.

teardown_class

@classmethod
def teardown_class(cls) -> None

Tear down the multiplexer.

get_message_from_decision_maker_inbox

def get_message_from_decision_maker_inbox() -> Optional[Message]

Get message from decision maker inbox.

process_message_cycle

def process_message_cycle(handler: Optional[Handler] = None, expected_content: Optional[Dict] = None, expected_types: Optional[Dict] = None, mining_interval_secs: float = 0) -> Optional[Message]

Processes one request-response type message cycle.

Steps: 1. Calls act on behaviour to generate outgoing message 2. Checks for message in outbox 3. Sends message to multiplexer and waits for response. 4. Passes message to handler 5. Calls act on behaviour to process incoming message

Arguments:

  • handler: the handler to handle a potential incoming message
  • expected_content: the content to be expected
  • expected_types: the types to be expected
  • mining_interval_secs: the mining interval used in the tests

Returns:

the incoming message

process_n_messages

def process_n_messages(ncycles: int, synchronized_data: Optional[BaseSynchronizedData] = None, behaviour_id: Optional[str] = None, handlers: Optional[HandlersType] = None, expected_content: Optional[ExpectedContentType] = None, expected_types: Optional[ExpectedTypesType] = None, fail_send_a2a: bool = False, mining_interval_secs: float = 0) -> Tuple[Optional[Message], ...]

Process n message cycles.

Arguments:

  • behaviour_id: the behaviour to fast forward to
  • ncycles: the number of message cycles to process
  • synchronized_data: a synchronized_data
  • handlers: a list of handlers
  • expected_content: the expected_content
  • expected_types: the expected type
  • fail_send_a2a: flag that indicates whether we want to simulate a failure in the send_a2a_transaction
  • mining_interval_secs: the mining interval used in the tests.

Returns:

tuple of incoming messages

_HarHatHelperIntegration Objects

class _HarHatHelperIntegration(IntegrationBaseCase)

Base test class for integration tests with HardHat provider.

setup_class

@classmethod
def setup_class(cls, **kwargs: Any) -> None

Setup.

Back to top