packages.valory.skills.abstract_
round_
abci.behaviour_
utils
This module contains helper classes for behaviours.
TM_
REQ_
TIMEOUT
5 seconds
SendException Objects
class SendException(Exception)
Exception raised if the 'try_send' to an AsyncBehaviour failed.
TimeoutException Objects
class TimeoutException(Exception)
Exception raised when a timeout during AsyncBehaviour occurs.
BaseBehaviourInternalError Objects
class BaseBehaviourInternalError(Exception)
Internal error due to a bad implementation of the BaseBehaviour.
__
init__
def __init__(message: str, *args: Any) -> None
Initialize the error object.
AsyncBehaviour Objects
class AsyncBehaviour(ABC)
MixIn behaviour class that support limited asynchronous programming.
An AsyncBehaviour can be in three states: - READY: no suspended 'async_act' execution; - RUNNING: 'act' called, and waiting for a message - WAITING_TICK: 'act' called, and waiting for the next 'act' call
AsyncState Objects
class AsyncState(Enum)
Enumeration of AsyncBehaviour states.
__
init__
def __init__() -> None
Initialize the async behaviour.
async_
act
@abstractmethod
def async_act() -> Generator
Do the act, supporting asynchronous execution.
async_
act_
wrapper
@abstractmethod
def async_act_wrapper() -> Generator
Do the act, supporting asynchronous execution.
state
@property
def state() -> AsyncState
Get the 'async state'.
is_
notified
@property
def is_notified() -> bool
Returns whether the behaviour has been notified about the arrival of a message.
received_
message
@property
def received_message() -> Any
Returns the message the behaviour has received. "__message" should be None if not availble or already consumed.
is_
stopped
@property
def is_stopped() -> bool
Check whether the behaviour has stopped.
try_
send
def try_send(message: Any) -> None
Try to send a message to a waiting behaviour.
It will be sent only if the behaviour is actually waiting for a message, and it was not already notified.
Arguments:
message
: a Python object.
Raises:
None
: SendException if the behaviour was not waiting for a message, or if it was already notified.
wait_
for_
condition
@classmethod
def wait_for_condition(
cls,
condition: Callable[[], bool],
timeout: Optional[float] = None) -> Generator[None, None, None]
Wait for a condition to happen.
This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.
Arguments:
condition
: the condition to wait fortimeout
: the maximum amount of time to wait
Returns:
None
sleep
def sleep(seconds: float) -> Any
Delay execution for a given number of seconds.
The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.
Arguments:
seconds
: the seconds
Returns:
None
wait_
for_
message
def wait_for_message(condition: Callable = lambda message: True,
timeout: Optional[float] = None) -> Any
Wait for message.
Care must be taken. This method does not handle concurrent requests. Use directly after a request is being sent. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.
Arguments:
condition
: a callabletimeout
: max time to wait (in seconds)
Returns:
a message
setup
def setup() -> None
Setup behaviour.
act
def act() -> None
Do the act.
stop
def stop() -> None
Stop the execution of the behaviour.
IPFSBehaviour Objects
class IPFSBehaviour(SimpleBehaviour, ABC)
Behaviour for interactions with IPFS.
__
init__
def __init__(**kwargs: Any)
Initialize an IPFSBehaviour
.
CleanUpBehaviour Objects
class CleanUpBehaviour(SimpleBehaviour, ABC)
Class for clean-up related functionality of behaviours.
__
init__
def __init__(**kwargs: Any)
Initialize a base behaviour.
clean_
up
def clean_up() -> None
Clean up the resources due to a 'stop' event.
It can be optionally implemented by the concrete classes.
handle_
late_
messages
def handle_late_messages(behaviour_id: str, message: Message) -> None
Handle late arriving messages.
Runs from another behaviour, even if the behaviour implementing the method has been exited. It can be optionally implemented by the concrete classes.
Arguments:
behaviour_id
: the id of the behaviour in which the message belongs to.message
: the late arriving message to handle.
RPCResponseStatus Objects
class RPCResponseStatus(Enum)
A custom status of an RPC response.
_
MetaBaseBehaviour Objects
class _MetaBaseBehaviour(ABCMeta)
A metaclass that validates BaseBehaviour's attributes.
__
new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Initialize the class.
BaseBehaviour Objects
class BaseBehaviour(AsyncBehaviour,
IPFSBehaviour,
CleanUpBehaviour,
ABC,
metaclass=_MetaBaseBehaviour)
This class represents the base class for FSM behaviours
A behaviour is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.
Concrete classes must set: - matching_round: the round class matching the behaviour;
Optionally, behaviour_id can be defined, although it is recommended to use the autogenerated id.
__
init__
def __init__(**kwargs: Any)
Initialize a base behaviour.
auto_
behaviour_
id
@classmethod
def auto_behaviour_id(cls) -> str
Get behaviour id automatically.
This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.
behaviour_
id
@property
def behaviour_id() -> str
Get behaviour id.
params
@property
def params() -> BaseParams
Return the params.
shared_
state
@property
def shared_state() -> SharedState
Return the round sequence.
round_
sequence
@property
def round_sequence() -> RoundSequence
Return the round sequence.
synchronized_
data
@property
def synchronized_data() -> BaseSynchronizedData
Return the synchronized data.
tm_
communication_
unhealthy
@property
def tm_communication_unhealthy() -> bool
Return if the Tendermint communication is not healthy anymore.
check_
in_
round
def check_in_round(round_id: str) -> bool
Check that we entered a specific round.
check_
in_
last_
round
def check_in_last_round(round_id: str) -> bool
Check that we entered a specific round.
check_
not_
in_
round
def check_not_in_round(round_id: str) -> bool
Check that we are not in a specific round.
check_
not_
in_
last_
round
def check_not_in_last_round(round_id: str) -> bool
Check that we are not in a specific round.
check_
round_
has_
finished
def check_round_has_finished(round_id: str) -> bool
Check that the round has finished.
check_
round_
height_
has_
changed
def check_round_height_has_changed(round_height: int) -> bool
Check that the round height has changed.
is_
round_
ended
def is_round_ended(round_id: str) -> Callable[[], bool]
Get a callable to check whether the current round has ended.
wait_
until_
round_
end
def wait_until_round_end(
timeout: Optional[float] = None) -> Generator[None, None, None]
Wait until the ABCI application exits from a round.
Arguments:
timeout
: the timeout for the wait
Returns:
None
wait_
from_
last_
timestamp
def wait_from_last_timestamp(seconds: float) -> Any
Delay execution for a given number of seconds from the last timestamp.
The argument may be a floating point number for subsecond precision. This is a local method that does not depend on the global clock, so the usage of datetime.now() is acceptable here.
Arguments:
seconds
: the seconds
Returns:
None
is_
done
def is_done() -> bool
Check whether the behaviour is done.
set_
done
def set_done() -> None
Set the behaviour to done.
send_
a2a_
transaction
def send_a2a_transaction(payload: BaseTxPayload,
resetting: bool = False) -> Generator
Send transaction and wait for the response, and repeat until not successful.
:param: payload: the payload to send :param: resetting: flag indicating if we are resetting Tendermint nodes in this round. :yield: the responses
async_
act_
wrapper
def async_act_wrapper() -> Generator
Do the act, supporting asynchronous execution.
num_
active_
peers
def num_active_peers(
timeout: Optional[float] = None
) -> Generator[None, None, Optional[int]]
Returns the number of active peers in the network.
get_
callback_
request
def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]
Wrapper for callback request which depends on whether the message has not been handled on time.
Returns:
the request callback.
get_
http_
response
def get_http_response(
method: str,
url: str,
content: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
parameters: Optional[Dict[str, str]] = None
) -> Generator[None, None, HttpMessage]
Send an http request message from the skill context.
This method is skill-specific, and therefore should not be used elsewhere.
Happy-path full flow of the messages.
_do_request: AbstractRoundAbci skill -> (HttpMessage | REQUEST) -> Http client connection Http client connection -> (HttpMessage | RESPONSE) -> AbstractRoundAbci skill
Arguments:
method
: the http request method (i.e. 'GET' or 'POST').url
: the url to send the message to.content
: the payload.headers
: headers to be included.parameters
: url query parameters.
Returns:
HttpMessage object
get_
signature
def get_signature(
message: bytes,
is_deprecated_mode: bool = False) -> Generator[None, None, str]
Get signature for message.
Happy-path full flow of the messages.
_send_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_MESSAGE) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_MESSAGE) -> AbstractRoundAbci skill
Arguments:
message
: message bytesis_deprecated_mode
: is deprecated mode flag
Returns:
SigningMessage object
send_
raw_
transaction
def send_raw_transaction(
transaction: RawTransaction,
use_flashbots: bool = False,
target_block_numbers: Optional[List[int]] = None,
raise_on_failed_simulation: bool = False,
chain_id: Optional[str] = None
) -> Generator[
None,
Union[None, SigningMessage, LedgerApiMessage],
Tuple[Optional[str], RPCResponseStatus],
]
Send raw transactions to the ledger for mining.
Happy-path full flow of the messages.
_send_transaction_signing_request: AbstractRoundAbci skill -> (SigningMessage | SIGN_TRANSACTION) -> DecisionMaker DecisionMaker -> (SigningMessage | SIGNED_TRANSACTION) -> AbstractRoundAbci skill
_send_transaction_request: AbstractRoundAbci skill -> (LedgerApiMessage | SEND_SIGNED_TRANSACTION) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_DIGEST) -> AbstractRoundAbci skill
Arguments:
transaction
: transaction datause_flashbots
: whether to use flashbots for the transaction or nottarget_block_numbers
: the target block numbers in case we are using flashbotsraise_on_failed_simulation
: whether to raise an exception if the transaction fails the simulation or notchain_id
: the chain name to use for the ledger call
Returns:
SigningMessage object
get_
transaction_
receipt
def get_transaction_receipt(
tx_digest: str,
retry_timeout: Optional[int] = None,
retry_attempts: Optional[int] = None
) -> Generator[None, None, Optional[Dict]]
Get transaction receipt.
Happy-path full flow of the messages.
_send_transaction_receipt_request: AbstractRoundAbci skill -> (LedgerApiMessage | GET_TRANSACTION_RECEIPT) -> Ledger connection Ledger connection -> (LedgerApiMessage | TRANSACTION_RECEIPT) -> AbstractRoundAbci skill
Arguments:
tx_digest
: transaction digest received from raw transaction.retry_timeout
: retry timeout.retry_attempts
: number of retry attempts allowed.
Returns:
LedgerApiMessage object
get_
ledger_
api_
response
def get_ledger_api_response(
performative: LedgerApiMessage.Performative, ledger_callable: str,
**kwargs: Any) -> Generator[None, None, LedgerApiMessage]
Request data from ledger api
Happy-path full flow of the messages.
AbstractRoundAbci skill -> (LedgerApiMessage | LedgerApiMessage.Performative) -> Ledger connection Ledger connection -> (LedgerApiMessage | LedgerApiMessage.Performative) -> AbstractRoundAbci skill
Arguments:
performative
: the message performativeledger_callable
: the callable to call on the contractkwargs
: keyword argument for the contract api request
Returns:
the contract api response
get_
contract_
api_
response
def get_contract_api_response(
performative: ContractApiMessage.Performative,
contract_address: Optional[str],
contract_id: str,
contract_callable: str,
ledger_id: Optional[str] = None,
**kwargs: Any) -> Generator[None, None, ContractApiMessage]
Request contract safe transaction hash
Happy-path full flow of the messages.
AbstractRoundAbci skill -> (ContractApiMessage | ContractApiMessage.Performative) -> Ledger connection (contract dispatcher) Ledger connection (contract dispatcher) -> (ContractApiMessage | ContractApiMessage.Performative) -> AbstractRoundAbci skill
Arguments:
performative
: the message performativecontract_address
: the contract addresscontract_id
: the contract idcontract_callable
: the callable to call on the contractledger_id
: the ledger id, if not specified, the default ledger id is usedkwargs
: keyword argument for the contract api request
Returns:
the contract api response
request_
recovery_
params
def request_recovery_params(should_log: bool) -> Generator[None, None, bool]
Request the Tendermint recovery parameters from the other agents via the ACN.
hard_
reset_
sleep
@property
def hard_reset_sleep() -> float
Amount of time to sleep before and after performing a hard reset.
We sleep for half the reset pause duration as there are no immediate transactions on either side of the reset.
Returns:
the amount of time to sleep in seconds
reset_
tendermint_
with_
wait
def reset_tendermint_with_wait(
on_startup: bool = False,
is_recovery: bool = False) -> Generator[None, None, bool]
Performs a hard reset (unsafe-reset-all) on the tendermint node.
Arguments:
on_startup
: whether we are resetting on the start of the agent.is_recovery
: whether the reset is being performed to recover the agent <-> tm communication.
Returns:
None
send_
to_
ipfs
def send_to_ipfs(filename: str,
obj: SupportedObjectType,
multiple: bool = False,
filetype: Optional[SupportedFiletype] = None,
custom_storer: Optional[CustomStorerType] = None,
timeout: Optional[float] = None,
**kwargs: Any) -> Generator[None, None, Optional[str]]
Store an object on IPFS.
Arguments:
filename
: the file name to store obj in. If "multiple" is True, filename will be the name of the dir.obj
: the object(s) to serialize and store in IPFS as "filename".multiple
: whether obj should be stored as multiple files, i.e. directory.filetype
: the file type of the object being downloaded.custom_storer
: a custom serializer for "obj".timeout
: timeout for the request.
Returns:
the downloaded object, corresponding to ipfs_hash.
get_
from_
ipfs
def get_from_ipfs(
ipfs_hash: str,
filetype: Optional[SupportedFiletype] = None,
custom_loader: CustomLoaderType = None,
timeout: Optional[float] = None
) -> Generator[None, None, Optional[SupportedObjectType]]
Gets an object from IPFS.
Arguments:
ipfs_hash
: the ipfs hash of the file/dir to download.filetype
: the file type of the object being downloaded.custom_loader
: a custom deserializer for the object received from IPFS.timeout
: timeout for the request.
Returns:
the downloaded object, corresponding to ipfs_hash.
TmManager Objects
class TmManager(BaseBehaviour)
Util class to be used for managing the tendermint node.
__
init__
def __init__(**kwargs: Any)
Initialize the TmManager
.
async_
act
def async_act() -> Generator
The behaviour act.
is_
acting
@property
def is_acting() -> bool
This method returns whether there is an active fix being applied.
hard_
reset_
sleep
@property
def hard_reset_sleep() -> float
Amount of time to sleep before and after performing a hard reset.
We don't need to wait for half the reset pause duration, like in normal cases where we perform a hard reset.
Returns:
the amount of time to sleep in seconds
get_
callback_
request
def get_callback_request() -> Callable[[Message, "BaseBehaviour"], None]
Wrapper for callback_request(), overridden to remove checks not applicable to TmManager.
try_
fix
def try_fix() -> None
This method tries to fix an unhealthy tendermint node.
DegenerateBehaviour Objects
class DegenerateBehaviour(BaseBehaviour, ABC)
An abstract matching behaviour for final and degenerate rounds.
async_
act
def async_act() -> Generator
Exit the agent with error when a degenerate round is reached.
make_
degenerate_
behaviour
def make_degenerate_behaviour(
round_cls: Type[AbstractRound]) -> Type[DegenerateBehaviour]
Make a degenerate behaviour class.