Skip to content

packages.valory.skills.abstract_round_abci.behaviour_utils

This module contains helper classes for behaviours.

SendException Objects

class SendException(Exception)

Exception raised if the 'try_send' to an AsyncBehaviour failed.

TimeoutException Objects

class TimeoutException(Exception)

Exception raised when a timeout during AsyncBehaviour occurs.

AsyncBehaviour Objects

class AsyncBehaviour(ABC)

MixIn behaviour class that support limited asynchronous programming.

An AsyncBehaviour can be in three states: - READY: no suspended 'async_act' execution; - RUNNING: 'act' called, and waiting for a message - WAITING_TICK: 'act' called, and waiting for the next 'act' call

AsyncState Objects

class AsyncState(Enum)

Enumeration of AsyncBehaviour states.

__init__

def __init__() -> None

Initialize the async behaviour.

async_act

@abstractmethod
def async_act() -> Generator

Do the act, supporting asynchronous execution.

async_act_wrapper

@abstractmethod
def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

state

@property
def state() -> AsyncState

Get the 'async state'.

is_stopped

@property
def is_stopped() -> bool

Check whether the behaviour has stopped.

try_send

def try_send(message: Any) -> None

Try to send a message to a waiting behaviour.

It will be sent only if the behaviour is actually waiting for a message, and it was not already notified.

Arguments:

:raises: SendException if the behaviour was not waiting for a message, or if it was already notified. - message: a Python object.

wait_for_condition

@classmethod
def wait_for_condition(cls, condition: Callable[[], bool], timeout: Optional[float] = None) -> Generator[None, None, None]

Wait for a condition to happen.

This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - condition: the condition to wait for - timeout: the maximum amount of time to wait

sleep

def sleep(seconds: float) -> Any

Delay execution for a given number of seconds.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - seconds: the seconds

wait_for_message

def wait_for_message(condition: Callable = lambda message: True, timeout: Optional[float] = None) -> Any

Wait for message.

Care must be taken. This method does not handle concurrent requests. Use directly after a request is being sent. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • condition: a callable
  • timeout: max time to wait (in seconds)

Returns:

a message

setup

def setup() -> None

Setup behaviour.

act

def act() -> None

Do the act.

stop

def stop() -> None

Stop the execution of the behaviour.

IPFSBehaviour Objects

class IPFSBehaviour(SimpleBehaviour,  ABC)

Behaviour for interactions with IPFS.

__init__

def __init__(**kwargs: Any)

Initialize an IPFSBehaviour.

send_to_ipfs

@_check_ipfs_enabled
def send_to_ipfs(filepath: str, obj: SupportedObjectType, multiple: bool = False, filetype: Optional[SupportedFiletype] = None, custom_storer: Optional[CustomStorerType] = None, **kwargs: Any, ,) -> Optional[str]

Send a file to IPFS.

get_from_ipfs

@_check_ipfs_enabled
def get_from_ipfs(hash_: str, target_dir: str, multiple: bool = False, filename: Optional[str] = None, filetype: Optional[SupportedFiletype] = None, custom_loader: CustomLoaderType = None) -> Optional[SupportedObjectType]

Get a file from IPFS.

CleanUpBehaviour Objects

class CleanUpBehaviour(SimpleBehaviour,  ABC)

Class for clean-up related functionality of behaviours.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

clean_up

def clean_up() -> None

Clean up the resources due to a 'stop' event.

It can be optionally implemented by the concrete classes.

handle_late_messages

def handle_late_messages(behaviour_id: str, message: Message) -> None

Handle late arriving messages.

Runs from another behaviour, even if the behaviour implementing the method has been exited. It can be optionally implemented by the concrete classes.

Arguments:

  • behaviour_id: the id of the behaviour in which the message belongs to.
  • message: the late arriving message to handle.

RPCResponseStatus Objects

class RPCResponseStatus(Enum)

A custom status of an RPC response.

BaseBehaviour Objects

class BaseBehaviour(AsyncBehaviour,  IPFSBehaviour,  CleanUpBehaviour,  ABC)

Base class for FSM behaviours.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

params

@property
def params() -> BaseParams

Return the params.

synchronized_data

@property
def synchronized_data() -> BaseSynchronizedData

Return the synchronized data.

tm_communication_unhealthy

@property
def tm_communication_unhealthy() -> bool

Return if the Tendermint communication is not healthy anymore.

check_in_round

def check_in_round(round_id: str) -> bool

Check that we entered a specific round.

check_in_last_round

def check_in_last_round(round_id: str) -> bool

Check that we entered a specific round.

check_not_in_round

def check_not_in_round(round_id: str) -> bool

Check that we are not in a specific round.

check_not_in_last_round

def check_not_in_last_round(round_id: str) -> bool

Check that we are not in a specific round.

check_round_has_finished

def check_round_has_finished(round_id: str) -> bool

Check that the round has finished.

check_round_height_has_changed

def check_round_height_has_changed(round_height: int) -> bool

Check that the round height has changed.

is_round_ended

def is_round_ended(round_id: str) -> Callable[[], bool]

Get a callable to check whether the current round has ended.

wait_until_round_end

def wait_until_round_end(timeout: Optional[float] = None) -> Generator[None, None, None]

Wait until the ABCI application exits from a round.

Arguments:

:yield: None - timeout: the timeout for the wait

wait_from_last_timestamp

def wait_from_last_timestamp(seconds: float) -> Any

Delay execution for a given number of seconds from the last timestamp.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - seconds: the seconds

is_done

def is_done() -> bool

Check whether the behaviour is done.

set_done

def set_done() -> None

Set the behaviour to done.

send_a2a_transaction

def send_a2a_transaction(payload: BaseTxPayload, resetting: bool = False) -> Generator

Send transaction and wait for the response, and repeat until not successful.

:param: payload: the payload to send :param: resetting: flag indicating if we are resetting Tendermint nodes in this round. :yield: the responses

async_act_wrapper

def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

get_callback_request

def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]

Wrapper for callback request which depends on whether the message has not been handled on time.

Returns:

the request callback.

get_http_response

def get_http_response(method: str, url: str, content: Optional[bytes] = None, headers: Optional[List[OrderedDict[str, str]]] = None, parameters: Optional[List[Tuple[str, str]]] = None) -> Generator[None, None, HttpMessage]

Send an http request message from the skill context.

This method is skill-specific, and therefore should not be used elsewhere.

Happy-path full flow of the messages.

_do_request: AbstractRoundAbci skill -> (HttpMessage | REQUEST) -> Http client connection Http client connection -> (HttpMessage | RESPONSE) -> AbstractRoundAbci skill

Arguments:

:yield: HttpMessage object - method: the http request method (i.e. 'GET' or 'POST'). - url: the url to send the message to. - content: the payload. - headers: headers to be included. - parameters: url query parameters.

Returns:

the http message and the http dialogue

get_signature

def get_signature(message: bytes, is_deprecated_mode: bool = False) -> Generator[None, None, str]

Get signature for message.

Happy-path full flow of the messages.

_send_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_MESSAGE) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_MESSAGE) -> AbstractRoundAbci skill

Arguments:

:yield: SigningMessage object - message: message bytes - is_deprecated_mode: is deprecated mode flag

Returns:

message signature

send_raw_transaction

def send_raw_transaction(transaction: RawTransaction) -> Generator[
        None,
        Union[None, SigningMessage, LedgerApiMessage],
        Tuple[Optional[str], RPCResponseStatus],
    ]

Send raw transactions to the ledger for mining.

Happy-path full flow of the messages.

_send_transaction_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_TRANSACTION) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_TRANSACTION) -> AbstractRoundAbci skill

_send_transaction_request: AbstractRoundAbci skill -> (LedgerApiMessage | SEND_SIGNED_TRANSACTION) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_DIGEST) -> AbstractRoundAbci skill

Arguments:

:yield: SigningMessage object - transaction: transaction data

Returns:

transaction hash

get_transaction_receipt

def get_transaction_receipt(tx_digest: str, retry_timeout: Optional[int] = None, retry_attempts: Optional[int] = None) -> Generator[None, None, Optional[Dict]]

Get transaction receipt.

Happy-path full flow of the messages.

_send_transaction_receipt_request: AbstractRoundAbci skill -> (LedgerApiMessage | GET_TRANSACTION_RECEIPT) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_RECEIPT) -> AbstractRoundAbci skill

Arguments:

:yield: LedgerApiMessage object - tx_digest: transaction digest received from raw transaction. - retry_timeout: retry timeout. - retry_attempts: number of retry attempts allowed.

Returns:

transaction receipt data

get_ledger_api_response

def get_ledger_api_response(performative: LedgerApiMessage.Performative, ledger_callable: str, **kwargs: Any, ,) -> Generator[None, None, LedgerApiMessage]

Request data from ledger api

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (LedgerApiMessage | LedgerApiMessage.Performative) -> Ledger connection Ledger connection -> (LedgerApiMessage | LedgerApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • ledger_callable: the callable to call on the contract
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

get_contract_api_response

def get_contract_api_response(performative: ContractApiMessage.Performative, contract_address: Optional[str], contract_id: str, contract_callable: str, **kwargs: Any, ,) -> Generator[None, None, ContractApiMessage]

Request contract safe transaction hash

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (ContractApiMessage | ContractApiMessage.Performative) -> Ledger connection (contract dispatcher) Ledger connection (contract dispatcher) -> (ContractApiMessage | ContractApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • contract_address: the contract address
  • contract_id: the contract id
  • contract_callable: the callable to call on the contract
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

reset_tendermint_with_wait

def reset_tendermint_with_wait(on_startup: bool = False) -> Generator[None, None, bool]

Resets the tendermint node.

DegenerateBehaviour Objects

class DegenerateBehaviour(BaseBehaviour,  ABC)

An abstract matching behaviour for final and degenerate rounds.

async_act

def async_act() -> Generator

Raise a RuntimeError.

make_degenerate_behaviour

def make_degenerate_behaviour(round_id: str) -> Type[DegenerateBehaviour]

Make a degenerate behaviour class.

Back to top