Skip to content

packages.valory.skills.abstract_round_abci.behaviour_utils

This module contains helper classes for behaviours.

TM_REQ_TIMEOUT

5 seconds

SendException Objects

class SendException(Exception)

Exception raised if the 'try_send' to an AsyncBehaviour failed.

TimeoutException Objects

class TimeoutException(Exception)

Exception raised when a timeout during AsyncBehaviour occurs.

BaseBehaviourInternalError Objects

class BaseBehaviourInternalError(Exception)

Internal error due to a bad implementation of the BaseBehaviour.

__init__

def __init__(message: str, *args: Any) -> None

Initialize the error object.

AsyncBehaviour Objects

class AsyncBehaviour(ABC)

MixIn behaviour class that support limited asynchronous programming.

An AsyncBehaviour can be in three states: - READY: no suspended 'async_act' execution; - RUNNING: 'act' called, and waiting for a message - WAITING_TICK: 'act' called, and waiting for the next 'act' call

AsyncState Objects

class AsyncState(Enum)

Enumeration of AsyncBehaviour states.

__init__

def __init__() -> None

Initialize the async behaviour.

async_act

@abstractmethod
def async_act() -> Generator

Do the act, supporting asynchronous execution.

async_act_wrapper

@abstractmethod
def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

state

@property
def state() -> AsyncState

Get the 'async state'.

is_notified

@property
def is_notified() -> bool

Returns whether the behaviour has been notified about the arrival of a message.

received_message

@property
def received_message() -> Any

Returns the message the behaviour has received. "__message" should be None if not availble or already consumed.

is_stopped

@property
def is_stopped() -> bool

Check whether the behaviour has stopped.

try_send

def try_send(message: Any) -> None

Try to send a message to a waiting behaviour.

It will be sent only if the behaviour is actually waiting for a message, and it was not already notified.

Arguments:

  • message: a Python object.

Raises:

  • None: SendException if the behaviour was not waiting for a message, or if it was already notified.

wait_for_condition

@classmethod
def wait_for_condition(
        cls,
        condition: Callable[[], bool],
        timeout: Optional[float] = None) -> Generator[None, None, None]

Wait for a condition to happen.

This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • condition: the condition to wait for
  • timeout: the maximum amount of time to wait

Returns:

None

sleep

def sleep(seconds: float) -> Any

Delay execution for a given number of seconds.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • seconds: the seconds

Returns:

None

wait_for_message

def wait_for_message(condition: Callable = lambda message: True,
                     timeout: Optional[float] = None) -> Any

Wait for message.

Care must be taken. This method does not handle concurrent requests. Use directly after a request is being sent. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • condition: a callable
  • timeout: max time to wait (in seconds)

Returns:

a message

setup

def setup() -> None

Setup behaviour.

act

def act() -> None

Do the act.

stop

def stop() -> None

Stop the execution of the behaviour.

IPFSBehaviour Objects

class IPFSBehaviour(SimpleBehaviour, ABC)

Behaviour for interactions with IPFS.

__init__

def __init__(**kwargs: Any)

Initialize an IPFSBehaviour.

CleanUpBehaviour Objects

class CleanUpBehaviour(SimpleBehaviour, ABC)

Class for clean-up related functionality of behaviours.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

clean_up

def clean_up() -> None

Clean up the resources due to a 'stop' event.

It can be optionally implemented by the concrete classes.

handle_late_messages

def handle_late_messages(behaviour_id: str, message: Message) -> None

Handle late arriving messages.

Runs from another behaviour, even if the behaviour implementing the method has been exited. It can be optionally implemented by the concrete classes.

Arguments:

  • behaviour_id: the id of the behaviour in which the message belongs to.
  • message: the late arriving message to handle.

RPCResponseStatus Objects

class RPCResponseStatus(Enum)

A custom status of an RPC response.

_MetaBaseBehaviour Objects

class _MetaBaseBehaviour(ABCMeta)

A metaclass that validates BaseBehaviour's attributes.

__new__

def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
            **kwargs: Any) -> Type

Initialize the class.

BaseBehaviour Objects

class BaseBehaviour(AsyncBehaviour,
                    IPFSBehaviour,
                    CleanUpBehaviour,
                    ABC,
                    metaclass=_MetaBaseBehaviour)

This class represents the base class for FSM behaviours

A behaviour is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.

Concrete classes must set: - matching_round: the round class matching the behaviour;

Optionally, behaviour_id can be defined, although it is recommended to use the autogenerated id.

__init__

def __init__(**kwargs: Any)

Initialize a base behaviour.

auto_behaviour_id

@classmethod
def auto_behaviour_id(cls) -> str

Get behaviour id automatically.

This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.

behaviour_id

@property
def behaviour_id() -> str

Get behaviour id.

params

@property
def params() -> BaseParams

Return the params.

shared_state

@property
def shared_state() -> SharedState

Return the round sequence.

round_sequence

@property
def round_sequence() -> RoundSequence

Return the round sequence.

synchronized_data

@property
def synchronized_data() -> BaseSynchronizedData

Return the synchronized data.

tm_communication_unhealthy

@property
def tm_communication_unhealthy() -> bool

Return if the Tendermint communication is not healthy anymore.

check_in_round

def check_in_round(round_id: str) -> bool

Check that we entered a specific round.

check_in_last_round

def check_in_last_round(round_id: str) -> bool

Check that we entered a specific round.

check_not_in_round

def check_not_in_round(round_id: str) -> bool

Check that we are not in a specific round.

check_not_in_last_round

def check_not_in_last_round(round_id: str) -> bool

Check that we are not in a specific round.

check_round_has_finished

def check_round_has_finished(round_id: str) -> bool

Check that the round has finished.

check_round_height_has_changed

def check_round_height_has_changed(round_height: int) -> bool

Check that the round height has changed.

is_round_ended

def is_round_ended(round_id: str) -> Callable[[], bool]

Get a callable to check whether the current round has ended.

wait_until_round_end

def wait_until_round_end(
        timeout: Optional[float] = None) -> Generator[None, None, None]

Wait until the ABCI application exits from a round.

Arguments:

  • timeout: the timeout for the wait

Returns:

None

wait_from_last_timestamp

def wait_from_last_timestamp(seconds: float) -> Any

Delay execution for a given number of seconds from the last timestamp.

The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.

Arguments:

  • seconds: the seconds

Returns:

None

is_done

def is_done() -> bool

Check whether the behaviour is done.

set_done

def set_done() -> None

Set the behaviour to done.

send_a2a_transaction

def send_a2a_transaction(payload: BaseTxPayload,
                         resetting: bool = False) -> Generator

Send transaction and wait for the response, and repeat until not successful.

:param: payload: the payload to send :param: resetting: flag indicating if we are resetting Tendermint nodes in this round. :yield: the responses

async_act_wrapper

def async_act_wrapper() -> Generator

Do the act, supporting asynchronous execution.

num_active_peers

def num_active_peers(
        timeout: Optional[float] = None
) -> Generator[None, None, Optional[int]]

Returns the number of active peers in the network.

get_callback_request

def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]

Wrapper for callback request which depends on whether the message has not been handled on time.

Returns:

the request callback.

get_http_response

def get_http_response(
    method: str,
    url: str,
    content: Optional[bytes] = None,
    headers: Optional[Dict[str, str]] = None,
    parameters: Optional[Dict[str, str]] = None
) -> Generator[None, None, HttpMessage]

Send an http request message from the skill context.

This method is skill-specific, and therefore should not be used elsewhere.

Happy-path full flow of the messages.

_do_request: AbstractRoundAbci skill -> (HttpMessage | REQUEST) -> Http client connection Http client connection -> (HttpMessage | RESPONSE) -> AbstractRoundAbci skill

Arguments:

  • method: the http request method (i.e. 'GET' or 'POST').
  • url: the url to send the message to.
  • content: the payload.
  • headers: headers to be included.
  • parameters: url query parameters.

Returns:

HttpMessage object

get_signature

def get_signature(
        message: bytes,
        is_deprecated_mode: bool = False) -> Generator[None, None, str]

Get signature for message.

Happy-path full flow of the messages.

_send_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_MESSAGE) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_MESSAGE) -> AbstractRoundAbci skill

Arguments:

  • message: message bytes
  • is_deprecated_mode: is deprecated mode flag

Returns:

SigningMessage object

send_raw_transaction

def send_raw_transaction(
    transaction: RawTransaction,
    use_flashbots: bool = False,
    target_block_numbers: Optional[List[int]] = None,
    raise_on_failed_simulation: bool = False,
    chain_id: Optional[str] = None
) -> Generator[
        None,
        Union[None, SigningMessage, LedgerApiMessage],
        Tuple[Optional[str], RPCResponseStatus],
]

Send raw transactions to the ledger for mining.

Happy-path full flow of the messages.

_send_transaction_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_TRANSACTION) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_TRANSACTION) -> AbstractRoundAbci skill

_send_transaction_request: AbstractRoundAbci skill -> (LedgerApiMessage | SEND_SIGNED_TRANSACTION) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_DIGEST) -> AbstractRoundAbci skill

Arguments:

  • transaction: transaction data
  • use_flashbots: whether to use flashbots for the transaction or not
  • target_block_numbers: the target block numbers in case we are using flashbots
  • raise_on_failed_simulation: whether to raise an exception if the transaction fails the simulation or not
  • chain_id: the chain name to use for the ledger call

Returns:

SigningMessage object

get_transaction_receipt

def get_transaction_receipt(
    tx_digest: str,
    retry_timeout: Optional[int] = None,
    retry_attempts: Optional[int] = None
) -> Generator[None, None, Optional[Dict]]

Get transaction receipt.

Happy-path full flow of the messages.

_send_transaction_receipt_request: AbstractRoundAbci skill -> (LedgerApiMessage | GET_TRANSACTION_RECEIPT) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_RECEIPT) -> AbstractRoundAbci skill

Arguments:

  • tx_digest: transaction digest received from raw transaction.
  • retry_timeout: retry timeout.
  • retry_attempts: number of retry attempts allowed.

Returns:

LedgerApiMessage object

get_ledger_api_response

def get_ledger_api_response(
        performative: LedgerApiMessage.Performative, ledger_callable: str,
        **kwargs: Any) -> Generator[None, None, LedgerApiMessage]

Request data from ledger api

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (LedgerApiMessage | LedgerApiMessage.Performative) -> Ledger connection Ledger connection -> (LedgerApiMessage | LedgerApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • ledger_callable: the callable to call on the contract
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

get_contract_api_response

def get_contract_api_response(
        performative: ContractApiMessage.Performative,
        contract_address: Optional[str],
        contract_id: str,
        contract_callable: str,
        ledger_id: Optional[str] = None,
        **kwargs: Any) -> Generator[None, None, ContractApiMessage]

Request contract safe transaction hash

Happy-path full flow of the messages.

AbstractRoundAbci skill -> (ContractApiMessage | ContractApiMessage.Performative) -> Ledger connection (contract dispatcher) Ledger connection (contract dispatcher) -> (ContractApiMessage | ContractApiMessage.Performative) -> AbstractRoundAbci skill

Arguments:

  • performative: the message performative
  • contract_address: the contract address
  • contract_id: the contract id
  • contract_callable: the callable to call on the contract
  • ledger_id: the ledger id, if not specified, the default ledger id is used
  • kwargs: keyword argument for the contract api request

Returns:

the contract api response

request_recovery_params

def request_recovery_params(should_log: bool) -> Generator[None, None, bool]

Request the Tendermint recovery parameters from the other agents via the ACN.

hard_reset_sleep

@property
def hard_reset_sleep() -> float

Amount of time to sleep before and after performing a hard reset.

We sleep for half the reset pause duration as there are no immediate transactions on either side of the reset.

Returns:

the amount of time to sleep in seconds

reset_tendermint_with_wait

def reset_tendermint_with_wait(
        on_startup: bool = False,
        is_recovery: bool = False) -> Generator[None, None, bool]

Performs a hard reset (unsafe-reset-all) on the tendermint node.

Arguments:

  • on_startup: whether we are resetting on the start of the agent.
  • is_recovery: whether the reset is being performed to recover the agent <-> tm communication.

Returns:

None

send_to_ipfs

def send_to_ipfs(filename: str,
                 obj: SupportedObjectType,
                 multiple: bool = False,
                 filetype: Optional[SupportedFiletype] = None,
                 custom_storer: Optional[CustomStorerType] = None,
                 timeout: Optional[float] = None,
                 **kwargs: Any) -> Generator[None, None, Optional[str]]

Store an object on IPFS.

Arguments:

  • filename: the file name to store obj in. If "multiple" is True, filename will be the name of the dir.
  • obj: the object(s) to serialize and store in IPFS as "filename".
  • multiple: whether obj should be stored as multiple files, i.e. directory.
  • filetype: the file type of the object being downloaded.
  • custom_storer: a custom serializer for "obj".
  • timeout: timeout for the request.

Returns:

the downloaded object, corresponding to ipfs_hash.

get_from_ipfs

def get_from_ipfs(
    ipfs_hash: str,
    filetype: Optional[SupportedFiletype] = None,
    custom_loader: CustomLoaderType = None,
    timeout: Optional[float] = None
) -> Generator[None, None, Optional[SupportedObjectType]]

Gets an object from IPFS.

Arguments:

  • ipfs_hash: the ipfs hash of the file/dir to download.
  • filetype: the file type of the object being downloaded.
  • custom_loader: a custom deserializer for the object received from IPFS.
  • timeout: timeout for the request.

Returns:

the downloaded object, corresponding to ipfs_hash.

TmManager Objects

class TmManager(BaseBehaviour)

Util class to be used for managing the tendermint node.

__init__

def __init__(**kwargs: Any)

Initialize the TmManager.

async_act

def async_act() -> Generator

The behaviour act.

is_acting

@property
def is_acting() -> bool

This method returns whether there is an active fix being applied.

hard_reset_sleep

@property
def hard_reset_sleep() -> float

Amount of time to sleep before and after performing a hard reset.

We don't need to wait for half the reset pause duration, like in normal cases where we perform a hard reset.

Returns:

the amount of time to sleep in seconds

get_callback_request

def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]

Wrapper for callback_request(), overridden to remove checks not applicable to TmManager.

try_fix

def try_fix() -> None

This method tries to fix an unhealthy tendermint node.

DegenerateBehaviour Objects

class DegenerateBehaviour(BaseBehaviour, ABC)

An abstract matching behaviour for final and degenerate rounds.

async_act

def async_act() -> Generator

Exit the agent with error when a degenerate round is reached.

make_degenerate_behaviour

def make_degenerate_behaviour(
        round_cls: Type[AbstractRound]) -> Type[DegenerateBehaviour]

Make a degenerate behaviour class.