Skip to content

packages.valory.skills.abstract_round_abci.behaviour_utils

This module contains helper classes for behaviours.

SendException Objects

class SendException(Exception)

Exception raised if the 'try_send' to an AsyncBehaviour failed.

TimeoutException Objects

class TimeoutException(Exception)

Exception raised when a timeout during AsyncBehaviour occurs.

BaseBehaviourInternalError Objects

class BaseBehaviourInternalError(Exception)

Internal error due to a bad implementation of the BaseBehaviour.

__init__

def __init__(message: str, *args: Any) -> None

Initialize the error object.

AsyncBehaviour Objects

class AsyncBehaviour(ABC)

MixIn behaviour class that support limited asynchronous programming.

An AsyncBehaviour can be in three states: - READY: no suspended 'async_act' execution; - RUNNING: 'act' called, and waiting for a message - WAITING_TICK: 'act' called, and waiting for the next 'act' call

AsyncState Objects

class AsyncState(Enum)

Enumeration of AsyncBehaviour states.

__init__

def __init__() -> None

Initialize the async behaviour.

async_act

@abstractmethod
def async_act() -> Generator

Do the act, supporting asynchronous execution.

async_act_wrapper

@abstractmethod
def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

state

@property
def state() -> AsyncState

Get the 'async state'.

is_notified

@property
def is_notified() -> bool

Returns whether the behaviour has been notified about the arrival of a message.

received_message

@property
def received_message() -> Any

Returns the message the behaviour has received. "__message" should be None if not availble or already consumed.

is_stopped

@property
def is_stopped() -> bool

Check whether the behaviour has stopped.

try_send

def try_send(message: Any) -> None

Try to send a message to a waiting behaviour.

It will be sent only if the behaviour is actually waiting for a message, and it was not already notified.

Arguments:

:raises: SendException if the behaviour was not waiting for a message, or if it was already notified. - message: a Python object.

wait_for_condition

@classmethod
def wait_for_condition(cls, condition: Callable[[], bool], timeout: Optional[float] = None) -> Generator[None, None, None]

Wait for a condition to happen.

This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - condition: the condition to wait for - timeout: the maximum amount of time to wait

sleep

def sleep(seconds: float) -> Any

Delay execution for a given number of seconds.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - seconds: the seconds

wait_for_message

def wait_for_message(condition: Callable = lambda message: True, timeout: Optional[float] = None) -> Any

Wait for message.

Care must be taken. This method does not handle concurrent requests. Use directly after a request is being sent. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • condition: a callable
  • timeout: max time to wait (in seconds)

Returns:

a message

setup

def setup() -> None

Setup behaviour.

act

def act() -> None

Do the act.

stop

def stop() -> None

Stop the execution of the behaviour.

IPFSBehaviour Objects

class IPFSBehaviour(SimpleBehaviour,  ABC)

Behaviour for interactions with IPFS.

__init__

def __init__(**kwargs: Any)

Initialize an IPFSBehaviour.

CleanUpBehaviour Objects

class CleanUpBehaviour(SimpleBehaviour,  ABC)

Class for clean-up related functionality of behaviours.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

clean_up

def clean_up() -> None

Clean up the resources due to a 'stop' event.

It can be optionally implemented by the concrete classes.

handle_late_messages

def handle_late_messages(behaviour_id: str, message: Message) -> None

Handle late arriving messages.

Runs from another behaviour, even if the behaviour implementing the method has been exited. It can be optionally implemented by the concrete classes.

Arguments:

  • behaviour_id: the id of the behaviour in which the message belongs to.
  • message: the late arriving message to handle.

RPCResponseStatus Objects

class RPCResponseStatus(Enum)

A custom status of an RPC response.

_MetaBaseBehaviour Objects

class _MetaBaseBehaviour(ABCMeta)

A metaclass that validates BaseBehaviour's attributes.

__new__

def __new__(mcs, name: str, bases: Tuple, namespace: Dict, **kwargs: Any) -> Type

Initialize the class.

BaseBehaviour Objects

class BaseBehaviour(
    AsyncBehaviour,  IPFSBehaviour,  CleanUpBehaviour,  ABC, metaclass=_MetaBaseBehaviour)

This class represents the base class for FSM behaviours

A behaviour is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.

Concrete classes must set: - matching_round: the round class matching the behaviour;

Optionally, behaviour_id can be defined, although it is recommended to use the autogenerated id.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

auto_behaviour_id

@classmethod
def auto_behaviour_id(cls) -> str

Get behaviour id automatically.

This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.

behaviour_id

@property
def behaviour_id() -> str

Get behaviour id.

params

@property
def params() -> BaseParams

Return the params.

synchronized_data

@property
def synchronized_data() -> BaseSynchronizedData

Return the synchronized data.

tm_communication_unhealthy

@property
def tm_communication_unhealthy() -> bool

Return if the Tendermint communication is not healthy anymore.

check_in_round

def check_in_round(round_id: str) -> bool

Check that we entered a specific round.

check_in_last_round

def check_in_last_round(round_id: str) -> bool

Check that we entered a specific round.

check_not_in_round

def check_not_in_round(round_id: str) -> bool

Check that we are not in a specific round.

check_not_in_last_round

def check_not_in_last_round(round_id: str) -> bool

Check that we are not in a specific round.

check_round_has_finished

def check_round_has_finished(round_id: str) -> bool

Check that the round has finished.

check_round_height_has_changed

def check_round_height_has_changed(round_height: int) -> bool

Check that the round height has changed.

is_round_ended

def is_round_ended(round_id: str) -> Callable[[], bool]

Get a callable to check whether the current round has ended.

wait_until_round_end

def wait_until_round_end(timeout: Optional[float] = None) -> Generator[None, None, None]

Wait until the ABCI application exits from a round.

Arguments:

:yield: None - timeout: the timeout for the wait

wait_from_last_timestamp

def wait_from_last_timestamp(seconds: float) -> Any

Delay execution for a given number of seconds from the last timestamp.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

:yield: None - seconds: the seconds

is_done

def is_done() -> bool

Check whether the behaviour is done.

set_done

def set_done() -> None

Set the behaviour to done.

send_a2a_transaction

def send_a2a_transaction(payload: BaseTxPayload, resetting: bool = False) -> Generator

Send transaction and wait for the response, and repeat until not successful.

:param: payload: the payload to send :param: resetting: flag indicating if we are resetting Tendermint nodes in this round. :yield: the responses

async_act_wrapper

def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

num_active_peers

def num_active_peers(timeout: Optional[float] = None) -> Generator[None, None, Optional[int]]

Returns the number of active peers in the network.

get_callback_request

def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]

Wrapper for callback request which depends on whether the message has not been handled on time.

Returns:

the request callback.

get_http_response

def get_http_response(method: str, url: str, content: Optional[bytes] = None, headers: Optional[List[OrderedDict[str, str]]] = None, parameters: Optional[List[Tuple[str, str]]] = None) -> Generator[None, None, HttpMessage]

Send an http request message from the skill context.

This method is skill-specific, and therefore should not be used elsewhere.

Happy-path full flow of the messages.

_do_request: AbstractRoundAbci skill -> (HttpMessage | REQUEST) -> Http client connection Http client connection -> (HttpMessage | RESPONSE) -> AbstractRoundAbci skill

Arguments:

:yield: HttpMessage object - method: the http request method (i.e. 'GET' or 'POST'). - url: the url to send the message to. - content: the payload. - headers: headers to be included. - parameters: url query parameters.

Returns:

the http message and the http dialogue

get_signature

def get_signature(message: bytes, is_deprecated_mode: bool = False) -> Generator[None, None, str]

Get signature for message.

Happy-path full flow of the messages.

_send_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_MESSAGE) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_MESSAGE) -> AbstractRoundAbci skill

Arguments:

:yield: SigningMessage object - message: message bytes - is_deprecated_mode: is deprecated mode flag

Returns:

message signature

send_raw_transaction

def send_raw_transaction(transaction: RawTransaction) -> Generator[
        None,
        Union[None, SigningMessage, LedgerApiMessage],
        Tuple[Optional[str], RPCResponseStatus],
    ]

Send raw transactions to the ledger for mining.

Happy-path full flow of the messages.

_send_transaction_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_TRANSACTION) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_TRANSACTION) -> AbstractRoundAbci skill

_send_transaction_request: AbstractRoundAbci skill -> (LedgerApiMessage | SEND_SIGNED_TRANSACTION) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_DIGEST) -> AbstractRoundAbci skill

Arguments:

:yield: SigningMessage object - transaction: transaction data

Returns:

transaction hash

get_transaction_receipt

def get_transaction_receipt(tx_digest: str, retry_timeout: Optional[int] = None, retry_attempts: Optional[int] = None) -> Generator[None, None, Optional[Dict]]

Get transaction receipt.

Happy-path full flow of the messages.

_send_transaction_receipt_request: AbstractRoundAbci skill -> (LedgerApiMessage | GET_TRANSACTION_RECEIPT) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_RECEIPT) -> AbstractRoundAbci skill

Arguments:

:yield: LedgerApiMessage object - tx_digest: transaction digest received from raw transaction. - retry_timeout: retry timeout. - retry_attempts: number of retry attempts allowed.

Returns:

transaction receipt data

get_ledger_api_response

def get_ledger_api_response(performative: LedgerApiMessage.Performative, ledger_callable: str, **kwargs: Any, ,) -> Generator[None, None, LedgerApiMessage]

Request data from ledger api

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (LedgerApiMessage | LedgerApiMessage.Performative) -> Ledger connection Ledger connection -> (LedgerApiMessage | LedgerApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • ledger_callable: the callable to call on the contract
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

get_contract_api_response

def get_contract_api_response(performative: ContractApiMessage.Performative, contract_address: Optional[str], contract_id: str, contract_callable: str, **kwargs: Any, ,) -> Generator[None, None, ContractApiMessage]

Request contract safe transaction hash

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (ContractApiMessage | ContractApiMessage.Performative) -> Ledger connection (contract dispatcher) Ledger connection (contract dispatcher) -> (ContractApiMessage | ContractApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • contract_address: the contract address
  • contract_id: the contract id
  • contract_callable: the callable to call on the contract
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

request_recovery_params

def request_recovery_params() -> Generator[None, None, bool]

Request the Tendermint recovery parameters from the other agents via the ACN.

hard_reset_sleep

@property
def hard_reset_sleep() -> float

Amount of time to sleep before and after performing a hard reset.

We sleep for half the observation interval as there are no immediate transactions on either side of the reset.

Returns:

the amount of time to sleep in seconds

reset_tendermint_with_wait

def reset_tendermint_with_wait(on_startup: bool = False, is_recovery: bool = False) -> Generator[None, None, bool]

Performs a hard reset (unsafe-reset-all) on the tendermint node.

Arguments:

:yields: None - on_startup: whether we are resetting on the start of the agent. - is_recovery: whether the reset is being performed to recover the agent <-> tm communication.

Returns:

whether the reset was successful.

send_to_ipfs

def send_to_ipfs(filename: str, obj: SupportedObjectType, multiple: bool = False, filetype: Optional[SupportedFiletype] = None, custom_storer: Optional[CustomStorerType] = None, timeout: Optional[float] = None, **kwargs: Any, ,) -> Generator[None, None, Optional[str]]

Store an object on IPFS.

Arguments:

  • filename: the file name to store obj in. If "multiple" is True, filename will be the name of the dir.
  • obj: the object(s) to serialize and store in IPFS as "filename".
  • multiple: whether obj should be stored as multiple files, i.e. directory.
  • filetype: the file type of the object being downloaded.
  • custom_storer: a custom serializer for "obj".
  • timeout: timeout for the request.

Returns:

the downloaded object, corresponding to ipfs_hash.

get_from_ipfs

def get_from_ipfs(ipfs_hash: str, filetype: Optional[SupportedFiletype] = None, custom_loader: CustomLoaderType = None, timeout: Optional[float] = None) -> Generator[None, None, Optional[SupportedObjectType]]

Gets an object from IPFS.

Arguments:

  • ipfs_hash: the ipfs hash of the file/dir to download.
  • filetype: the file type of the object being downloaded.
  • custom_loader: a custom deserializer for the object received from IPFS.
  • timeout: timeout for the request.

Returns:

the downloaded object, corresponding to ipfs_hash.

TmManager Objects

class TmManager(BaseBehaviour)

Util class to be used for managing the tendermint node.

async_act

def async_act() -> Generator

The behaviour act.

is_acting

@property
def is_acting() -> bool

This method returns whether there is an active fix being applied.

hard_reset_sleep

@property
def hard_reset_sleep() -> float

Amount of time to sleep before and after performing a hard reset.

We don't need to wait for half the observation interval, like in normal cases where we perform a hard reset.

Returns:

the amount of time to sleep in seconds

get_callback_request

def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]

Wrapper for callback_request(), overridden to remove checks not applicable to TmManager.

try_fix

def try_fix() -> None

This method tries to fix an unhealthy tendermint node.

DegenerateBehaviour Objects

class DegenerateBehaviour(BaseBehaviour,  ABC)

An abstract matching behaviour for final and degenerate rounds.

async_act

def async_act() -> Generator

Exit the agent with error when a degenerate round is reached.

make_degenerate_behaviour

def make_degenerate_behaviour(round_cls: Type[AbstractRound]) -> Type[DegenerateBehaviour]

Make a degenerate behaviour class.