packages.valory.skills.abstract_
round_
abci.base
This module contains the base classes for the models classes of the skill.
get_
name
def get_name(prop: Any) -> str
Get the name of a property.
ABCIAppException Objects
class ABCIAppException(Exception)
A parent class for all exceptions related to the ABCIApp.
SignatureNotValidError Objects
class SignatureNotValidError(ABCIAppException)
Error raised when a signature is invalid.
AddBlockError Objects
class AddBlockError(ABCIAppException)
Exception raised when a block addition is not valid.
ABCIAppInternalError Objects
class ABCIAppInternalError(ABCIAppException)
Internal error due to a bad implementation of the ABCIApp.
__
init__
def __init__(message: str, *args: Any) -> None
Initialize the error object.
TransactionTypeNotRecognizedError Objects
class TransactionTypeNotRecognizedError(ABCIAppException)
Error raised when a transaction type is not recognized.
TransactionNotValidError Objects
class TransactionNotValidError(ABCIAppException)
Error raised when a transaction is not valid.
LateArrivingTransaction Objects
class LateArrivingTransaction(ABCIAppException)
Error raised when the transaction belongs to previous round.
AbstractRoundInternalError Objects
class AbstractRoundInternalError(ABCIAppException)
Internal error due to a bad implementation of the AbstractRound.
__
init__
def __init__(message: str, *args: Any) -> None
Initialize the error object.
_
MetaPayload Objects
class _MetaPayload(ABCMeta)
Payload metaclass.
The purpose of this metaclass is to remember the association between the type of payload and the payload class to build it. This is necessary to recover the right payload class to instantiate at decoding time.
__
new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Create a new class object.
BaseTxPayload Objects
@dataclass(frozen=True)
class BaseTxPayload(metaclass=_MetaPayload)
This class represents a base class for transaction payload classes.
data
@property
def data() -> Dict[str, Any]
Data
values
@property
def values() -> Tuple[Any, ...]
Data
json
@property
def json() -> Dict[str, Any]
Json
from_
json
@classmethod
def from_json(cls, obj: Dict) -> "BaseTxPayload"
Decode the payload.
with_
new_
id
def with_new_id() -> "BaseTxPayload"
Create a new payload with the same content but new id.
encode
def encode() -> bytes
Encode
decode
@classmethod
def decode(cls, obj: bytes) -> "BaseTxPayload"
Decode
Transaction Objects
@dataclass(frozen=True)
class Transaction(ABC)
Class to represent a transaction for the ephemeral chain of a period.
encode
def encode() -> bytes
Encode the transaction.
decode
@classmethod
def decode(cls, obj: bytes) -> "Transaction"
Decode the transaction.
verify
def verify(ledger_id: str) -> None
Verify the signature is correct.
Arguments:
ledger_id
: the ledger id of the address
Raises:
None
: SignatureNotValidError: if the signature is not valid.
Block Objects
class Block()
Class to represent (a subset of) data of a Tendermint block.
__
init__
def __init__(header: Header, transactions: Sequence[Transaction]) -> None
Initialize the block.
transactions
@property
def transactions() -> Tuple[Transaction, ...]
Get the transactions.
timestamp
@property
def timestamp() -> datetime.datetime
Get the block timestamp.
Blockchain Objects
class Blockchain()
Class to represent a (naive) Tendermint blockchain.
The consistency of the data in the blocks is guaranteed by Tendermint.
__
init__
def __init__(height_offset: int = 0, is_init: bool = True) -> None
Initialize the blockchain.
is_
init
@property
def is_init() -> bool
Returns true if the blockchain is initialized.
add_
block
def add_block(block: Block) -> None
Add a block to the list.
height
@property
def height() -> int
Get the height.
Tendermint's height starts from 1. A return value equal to 0 means empty blockchain.
Returns:
the height.
length
@property
def length() -> int
Get the blockchain length.
blocks
@property
def blocks() -> Tuple[Block, ...]
Get the blocks.
last_
block
@property
def last_block() -> Block
Returns the last stored block.
BlockBuilder Objects
class BlockBuilder()
Helper class to build a block.
__
init__
def __init__() -> None
Initialize the block builder.
reset
def reset() -> None
Reset the temporary data structures.
header
@property
def header() -> Header
Get the block header.
Returns:
the block header
header
@header.setter
def header(header: Header) -> None
Set the header.
transactions
@property
def transactions() -> Tuple[Transaction, ...]
Get the sequence of transactions.
add_
transaction
def add_transaction(transaction: Transaction) -> None
Add a transaction.
get_
block
def get_block() -> Block
Get the block.
AbciAppDB Objects
class AbciAppDB()
Class to represent all data replicated across agents.
This class stores all the data in self._data. Every entry on this dict represents an optional "period" within your app execution. The concept of period is user-defined, so it might be something like a sequence of rounds that together conform a logical cycle of its execution, or it might have no sense at all (thus its optionality) and therefore only period 0 will be used.
Every "period" entry stores a dict where every key is a saved parameter and its corresponding value a list containing the history of the parameter values. For instance, for period 0:
0: {"parameter_name": [parameter_history]}
A complete database could look like this:
data = { 0: { "participants": [ {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ] }, "other_parameter": [0, 2, 8] }, 1: { "participants": [ {"participant_a", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ], "other_parameter": [3, 19, 10, 32, 6] }, 2: ... }
Adding and removing data from the current period
To update the current period entry, just call update() on the class. The new values will be appended to the current list for each updated parameter.
To clean up old data from the current period entry, call cleanup_current_histories(cleanup_history_depth_current), where cleanup_history_depth_current is the amount of data that you want to keep after the cleanup. The newest cleanup_history_depth_current values will be kept for each parameter in the DB.
Creating and removing old periods
To create a new period entry, call create() on the class. The new values will be stored in a new list for each updated parameter.
To remove old periods, call cleanup(cleanup_history_depth, [cleanup_history_depth_current]), where cleanup_history_depth is the amount of periods that you want to keep after the cleanup. The newest cleanup_history_depth periods will be kept. If you also specify cleanup_history_depth_current, cleanup_current_histories will be also called (see previous point).
The parameters cleanup_history_depth and cleanup_history_depth_current can also be configured in skill.yaml so they are used automatically when the cleanup method is called from AbciApp.cleanup().
Memory warning
The database is implemented in such a way to avoid indirect modification of its contents. It copies all the mutable data structures*, which means that it consumes more memory than expected. This is necessary because otherwise it would risk chance of modification from the behaviour side, which is a safety concern.
The effect of this on the memory usage should not be a big concern, because:
1. The synchronized data of the agents are not intended to store large amount of data.
IPFS should be used in such cases, and only the hash should be synchronized in the db.
2. The data are automatically wiped after a predefined `cleanup_history` depth as described above.
3. The retrieved data are only meant to be used for a short amount of time,
e.g., to perform a decision on a behaviour, which means that the gc will collect them before they are noticed.
- the in-built
copy
module is used, which automatically detects if an item is immutable and skips copying it. For more information take a look at the_deepcopy_atomic
method and its usage: https://github.com/python/cpython/blob/3.10/Lib/copy.py#L182-L183
__
init__
def __init__(setup_data: Dict[str, List[Any]],
cross_period_persisted_keys: Optional[FrozenSet[str]] = None,
logger: Optional[logging.Logger] = None) -> None
Initialize the AbciApp database.
setup_data must be passed as a Dict[str, List[Any]] (the database internal format). The staticmethod 'data_to_lists' can be used to convert from Dict[str, Any] to Dict[str, List[Any]] before instantiating this class.
Arguments:
setup_data
: the setup datacross_period_persisted_keys
: data keys that will be kept after a new period startslogger
: the logger of the abci app
setup_
data
@property
def setup_data() -> Dict[str, Any]
Get the setup_data without entries which have empty values.
Returns:
the setup_data
reset_
index
@property
def reset_index() -> int
Get the current reset index.
round_
count
@property
def round_count() -> int
Get the round count.
round_
count
@round_count.setter
def round_count(round_count: int) -> None
Set the round count.
cross_
period_
persisted_
keys
@property
def cross_period_persisted_keys() -> FrozenSet[str]
Keys in the database which are persistent across periods.
get
def get(key: str, default: Any = VALUE_NOT_PROVIDED) -> Optional[Any]
Given a key, get its last for the current reset index.
get_
strict
def get_strict(key: str) -> Any
Get a value from the data dictionary and raise if it is None.
validate
@staticmethod
def validate(data: Any) -> None
Validate if the given data are json serializable and therefore can be accepted into the database.
Arguments:
data
: the data to check.
Raises:
ABCIAppInternalError
: If the data are not serializable.
update
def update(**kwargs: Any) -> None
Update the current data.
create
def create(**kwargs: Any) -> None
Add a new entry to the data.
Passes automatically the values of the cross_period_persisted_keys
to the next period.
Arguments:
kwargs
: keyword arguments
get_
latest_
from_
reset_
index
def get_latest_from_reset_index(reset_index: int) -> Dict[str, Any]
Get the latest key-value pairs from the data dictionary for the specified period.
get_
latest
def get_latest() -> Dict[str, Any]
Get the latest key-value pairs from the data dictionary for the current period.
increment_
round_
count
def increment_round_count() -> None
Increment the round count.
__
repr__
def __repr__() -> str
Return a string representation of the data.
cleanup
def cleanup(cleanup_history_depth: int,
cleanup_history_depth_current: Optional[int] = None) -> None
Reset the db, keeping only the latest entries (periods).
If cleanup_history_depth_current has been also set, also clear oldest historic values in the current entry.
Arguments:
cleanup_history_depth
: depth to clean up historycleanup_history_depth_current
: whether or not to clean up current entry too.
cleanup_
current_
histories
def cleanup_current_histories(cleanup_history_depth_current: int) -> None
Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.
serialize
def serialize() -> str
Serialize the data of the database to a string.
sync
def sync(serialized_data: str) -> None
Synchronize the data using a serialized object.
Arguments:
serialized_data
: the serialized data to use in order to sync the db.
Raises:
ABCIAppInternalError
: if the given data cannot be deserialized.
hash
def hash() -> bytes
Create a hash of the data.
data_
to_
lists
@staticmethod
def data_to_lists(data: Dict[str, Any]) -> Dict[str, List[Any]]
Convert Dict[str, Any] to Dict[str, List[Any]].
BaseSynchronizedData Objects
class BaseSynchronizedData()
Class to represent the synchronized data.
This is the relevant data constructed and replicated by the agents.
__
init__
def __init__(db: AbciAppDB) -> None
Initialize the synchronized data.
db
@property
def db() -> AbciAppDB
Get DB.
round_
count
@property
def round_count() -> int
Get the round count.
period_
count
@property
def period_count() -> int
Get the period count.
Periods are executions between calls to AbciAppDB.create(), so as soon as it is called, a new period begins. It is useful to have a logical subdivision of the FSM execution. For example, if AbciAppDB.create() is called during reset, then a period will be the execution between resets.
Returns:
the period count
participants
@property
def participants() -> FrozenSet[str]
Get the currently active participants.
all_
participants
@property
def all_participants() -> FrozenSet[str]
Get all registered participants.
max_
participants
@property
def max_participants() -> int
Get the number of all the participants.
consensus_
threshold
@property
def consensus_threshold() -> int
Get the consensus threshold.
sorted_
participants
@property
def sorted_participants() -> Sequence[str]
Get the sorted participants' addresses.
The addresses are sorted according to their hexadecimal value; this is the reason we use key=str.lower as comparator.
This property is useful when interacting with the Safe contract.
Returns:
the sorted participants' addresses
nb_
participants
@property
def nb_participants() -> int
Get the number of participants.
slashing_
config
@property
def slashing_config() -> str
Get the slashing configuration.
slashing_
config
@slashing_config.setter
def slashing_config(config: str) -> None
Set the slashing configuration.
update
def update(synchronized_data_class: Optional[Type] = None,
**kwargs: Any) -> "BaseSynchronizedData"
Copy and update the current data.
create
def create(
synchronized_data_class: Optional[Type] = None
) -> "BaseSynchronizedData"
Copy and update with new data. Set values are stored as sorted tuples to the db for determinism.
__
repr__
def __repr__() -> str
Return a string representation of the data.
keeper_
randomness
@property
def keeper_randomness() -> float
Get the keeper's random number [0-1].
most_
voted_
randomness
@property
def most_voted_randomness() -> str
Get the most_voted_randomness.
most_
voted_
keeper_
address
@property
def most_voted_keeper_address() -> str
Get the most_voted_keeper_address.
is_
keeper_
set
@property
def is_keeper_set() -> bool
Check whether keeper is set.
blacklisted_
keepers
@property
def blacklisted_keepers() -> Set[str]
Get the current cycle's blacklisted keepers who cannot submit a transaction.
participant_
to_
selection
@property
def participant_to_selection() -> DeserializedCollection
Check whether keeper is set.
participant_
to_
randomness
@property
def participant_to_randomness() -> DeserializedCollection
Check whether keeper is set.
participant_
to_
votes
@property
def participant_to_votes() -> DeserializedCollection
Check whether keeper is set.
safe_
contract_
address
@property
def safe_contract_address() -> str
Get the safe contract address.
_
MetaAbstractRound Objects
class _MetaAbstractRound(ABCMeta)
A metaclass that validates AbstractRound's attributes.
__
new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Initialize the class.
AbstractRound Objects
class AbstractRound(Generic[EventType], ABC, metaclass=_MetaAbstractRound)
This class represents an abstract round.
A round is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.
Concrete classes must set: - synchronized_data_class: the data class associated with this round; - payload_class: the payload type that is allowed for this round;
Optionally, round_id can be defined, although it is recommended to use the autogenerated id.
__
init__
def __init__(
synchronized_data: BaseSynchronizedData,
context: SkillContext,
previous_round_payload_class: Optional[Type[BaseTxPayload]] = None
) -> None
Initialize the round.
auto_
round_
id
@classmethod
def auto_round_id(cls) -> str
Get round id automatically.
This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.
round_
id
@property
def round_id() -> str
Get round id.
synchronized_
data
@property
def synchronized_data() -> BaseSynchronizedData
Get the synchronized data.
check_
transaction
def check_transaction(transaction: Transaction) -> None
Check transaction against the current state.
Arguments:
transaction
: the transaction
process_
transaction
def process_transaction(transaction: Transaction) -> None
Process a transaction.
By convention, the payload handler should be a method of the class that is named '{payload_name}'.
Arguments:
transaction
: the transaction.
end_
block
@abstractmethod
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
The role of this method is check whether the round is considered ended.
If the round is ended, the return value is - the final result of the round. - the event that triggers a transition. If None, the period in which the round was executed is considered ended.
This is done after each block because we consider the consensus engine's block, and not the transaction, as the smallest unit on which the consensus is reached; in other words, each read operation on the state should be done only after each block, and not after each transaction.
check_
payload_
type
def check_payload_type(transaction: Transaction) -> None
Check the transaction is of the allowed transaction type.
Arguments:
transaction
: the transaction
Raises:
None
: TransactionTypeNotRecognizedError if the transaction can be applied to the current state.
check_
majority_
possible_
with_
new_
voter
def check_majority_possible_with_new_voter(
votes_by_participant: Dict[str, BaseTxPayload],
new_voter: str,
new_vote: BaseTxPayload,
nb_participants: int,
exception_cls: Type[ABCIAppException] = ABCIAppException) -> None
Check that a Byzantine majority is achievable, once a new vote is added.
Arguments:
votes_by_participant
: a mapping from a participant to its vote, before the new vote is addednew_voter
: the new voternew_vote
: the new votenb_participants
: the total number of participantsexception_cls
: the class of the exception to raise in case the check fails.
Raises:
None
: exception_cls: in case the check does not pass.
check_
majority_
possible
def check_majority_possible(
votes_by_participant: Dict[str, BaseTxPayload],
nb_participants: int,
exception_cls: Type[ABCIAppException] = ABCIAppException) -> None
Check that a Byzantine majority is still achievable.
The idea is that, even if all the votes have not been delivered yet, it can be deduced whether a quorum cannot be reached due to divergent preferences among the voters and due to a too small number of other participants whose vote has not been delivered yet.
The check fails iff:
nb_remaining_votes + largest_nb_votes < quorum
That is, if the number of remaining votes is not enough to make the most voted item so far to exceed the quorum.
Preconditions on the input: - the size of votes_by_participant should not be greater than "nb_participants - 1" voters - new voter must not be in the current votes_by_participant
Arguments:
votes_by_participant
: a mapping from a participant to its votenb_participants
: the total number of participantsexception_cls
: the class of the exception to raise in case the check fails.
Raises:
exception_cls
: in case the check does not pass.
is_
majority_
possible
def is_majority_possible(votes_by_participant: Dict[str, BaseTxPayload],
nb_participants: int) -> bool
Return true if a Byzantine majority is achievable, false otherwise.
Arguments:
votes_by_participant
: a mapping from a participant to its votenb_participants
: the total number of participants
Returns:
True if the majority is still possible, false otherwise.
check_
payload
@abstractmethod
def check_payload(payload: BaseTxPayload) -> None
Check payload.
process_
payload
@abstractmethod
def process_payload(payload: BaseTxPayload) -> None
Process payload.
DegenerateRound Objects
class DegenerateRound(AbstractRound, ABC)
This class represents the finished round during operation.
It is a sink round.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check payload.
process_
payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
End block.
CollectionRound Objects
class CollectionRound(AbstractRound, ABC)
CollectionRound.
This class represents abstract logic for collection based rounds where the round object needs to collect data from different agents. The data might for example be from a voting round or estimation round.
_allow_rejoin_payloads
is used to allow agents not currently active to
deliver a payload.
__
init__
def __init__(*args: Any, **kwargs: Any)
Initialize the collection round.
serialize_
collection
@staticmethod
def serialize_collection(
collection: DeserializedCollection) -> SerializedCollection
Deserialize a serialized collection.
deserialize_
collection
@staticmethod
def deserialize_collection(
serialized: SerializedCollection) -> DeserializedCollection
Deserialize a serialized collection.
serialized_
collection
@property
def serialized_collection() -> SerializedCollection
A collection with the addresses mapped to serialized payloads.
accepting_
payloads_
from
@property
def accepting_payloads_from() -> FrozenSet[str]
Accepting from the active set, or also from (re)joiners
payloads
@property
def payloads() -> List[BaseTxPayload]
Get all agent payloads
payload_
values_
count
@property
def payload_values_count() -> Counter
Get count of payload values.
process_
payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
_
CollectUntilAllRound Objects
class _CollectUntilAllRound(CollectionRound, ABC)
_CollectUntilAllRound
This class represents abstract logic for when rounds need to collect payloads from all agents.
This round should only be used when non-BFT behaviour is acceptable.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
process_
payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
collection_
threshold_
reached
@property
def collection_threshold_reached() -> bool
Check that the collection threshold has been reached.
CollectDifferentUntilAllRound Objects
class CollectDifferentUntilAllRound(_CollectUntilAllRound, ABC)
CollectDifferentUntilAllRound
This class represents logic for rounds where a round needs to collect different payloads from each agent.
This round should only be used for registration of new agents when there is synchronization of the db.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
CollectSameUntilAllRound Objects
class CollectSameUntilAllRound(_CollectUntilAllRound, ABC)
This class represents logic for when a round needs to collect the same payload from all the agents.
This round should only be used for registration of new agents when there is no synchronization of the db.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
common_
payload
@property
def common_payload() -> Any
Get the common payload among the agents.
common_
payload_
values
@property
def common_payload_values() -> Tuple[Any, ...]
Get the common payload among the agents.
CollectSameUntilThresholdRound Objects
class CollectSameUntilThresholdRound(CollectionRound, ABC)
CollectSameUntilThresholdRound
This class represents logic for rounds where a round needs to collect same payload from k of n agents.
done_event
is emitted when a) the collection threshold (k of n) is reached,
and b) the most voted payload has non-empty attributes. In this case all
payloads are saved under collection_key
and the most voted payload attributes
are saved under selection_key
.
none_event
is emitted when a) the collection threshold (k of n) is reached,
and b) the most voted payload has only empty attributes.
no_majority_event
is emitted when it is impossible to reach a k of n majority.
threshold_
reached
@property
def threshold_reached() -> bool
Check if the threshold has been reached.
most_
voted_
payload
@property
def most_voted_payload() -> Any
Get the most voted payload value.
Kept for backward compatibility.
most_
voted_
payload_
values
@property
def most_voted_payload_values() -> Tuple[Any, ...]
Get the most voted payload values.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
OnlyKeeperSendsRound Objects
class OnlyKeeperSendsRound(AbstractRound, ABC)
OnlyKeeperSendsRound
This class represents logic for rounds where only one agent sends a payload.
done_event
is emitted when a) the keeper payload has been received and b)
the keeper payload has non-empty attributes. In this case all attributes are saved
under payload_key
.
fail_event
is emitted when a) the keeper payload has been received and b)
the keeper payload has only empty attributes
process_
payload
def process_payload(payload: BaseTxPayload) -> None
Handle a deploy safe payload.
check_
payload
def check_payload(payload: BaseTxPayload) -> None
Check a deploy safe payload can be applied to the current state.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
VotingRound Objects
class VotingRound(CollectionRound, ABC)
VotingRound
This class represents logic for rounds where a round needs votes from
agents. Votes are in the form of True
(positive), False
(negative)
and None
(abstain). The round ends when k of n agents make the same vote.
done_event
is emitted when a) the collection threshold (k of n) is reached
with k positive votes. In this case all payloads are saved under collection_key
.
negative_event
is emitted when a) the collection threshold (k of n) is reached
with k negative votes.
none_event
is emitted when a) the collection threshold (k of n) is reached
with k abstain votes.
no_majority_event
is emitted when it is impossible to reach a k of n majority for
either of the options.
vote_
count
@property
def vote_count() -> Counter
Get agent payload vote count
positive_
vote_
threshold_
reached
@property
def positive_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
negative_
vote_
threshold_
reached
@property
def negative_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
none_
vote_
threshold_
reached
@property
def none_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
CollectDifferentUntilThresholdRound Objects
class CollectDifferentUntilThresholdRound(CollectionRound, ABC)
CollectDifferentUntilThresholdRound
This class represents logic for rounds where a round needs to collect different payloads from k of n agents.
done_event
is emitted when a) the required block confirmations
have been met, and b) the collection threshold (k of n) is reached. In
this case all payloads are saved under collection_key
.
Extended required_block_confirmations
to allow for arrival of more
payloads.
collection_
threshold_
reached
@property
def collection_threshold_reached() -> bool
Check if the threshold has been reached.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
CollectNonEmptyUntilThresholdRound Objects
class CollectNonEmptyUntilThresholdRound(CollectDifferentUntilThresholdRound,
ABC)
CollectNonEmptyUntilThresholdRound
This class represents logic for rounds where a round needs to collect optionally different payloads from k of n agents, where we only keep the non-empty attributes.
done_event
is emitted when a) the required block confirmations
have been met, b) the collection threshold (k of n) is reached, and
c) some non-empty attribute values have been collected. In this case
all payloads are saved under collection_key
. Under selection_key
the non-empty attribute values are stored.
none_event
is emitted when a) the required block confirmations
have been met, b) the collection threshold (k of n) is reached, and
c) no non-empty attribute values have been collected.
Attention: A none_event
might be triggered even though some of the
remaining n-k agents might send non-empty attributes! Extended
required_block_confirmations
can alleviate this somewhat.
end_
block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
TimeoutEvent Objects
@dataclass(order=True)
class TimeoutEvent(Generic[EventType])
Timeout event.
Timeouts Objects
class Timeouts(Generic[EventType])
Class to keep track of pending timeouts.
__
init__
def __init__() -> None
Initialize.
size
@property
def size() -> int
Get the size of the timeout queue.
add_
timeout
def add_timeout(deadline: datetime.datetime, event: EventType) -> int
Add a timeout.
cancel_
timeout
def cancel_timeout(entry_count: int) -> None
Remove a timeout.
Arguments:
entry_count
: the entry id to remove.
Raises:
None
: KeyError: if the entry count is not found.
pop_
earliest_
cancelled_
timeouts
def pop_earliest_cancelled_timeouts() -> None
Pop earliest cancelled timeouts.
get_
earliest_
timeout
def get_earliest_timeout() -> Tuple[datetime.datetime, Any]
Get the earliest timeout-event pair.
pop_
timeout
def pop_timeout() -> Tuple[datetime.datetime, Any]
Remove and return the earliest timeout-event pair.
_
MetaAbciApp Objects
class _MetaAbciApp(ABCMeta)
A metaclass that validates AbciApp's attributes.
__
new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Initialize the class.
BackgroundAppType Objects
class BackgroundAppType(Enum)
The type of a background app.
Please note that the values correspond to the priority in which the background apps should be processed when updating rounds.
correct_
types
@staticmethod
def correct_types() -> Set[str]
Return the correct types only.
BackgroundAppConfig Objects
@dataclass(frozen=True)
class BackgroundAppConfig(Generic[EventType])
Necessary configuration for a background app.
For a deeper understanding of the various types of background apps and how the config influences
the generated background app's type, please refer to the BackgroundApp
class.
The specify_type
method provides further insight on the subject matter.
BackgroundApp Objects
class BackgroundApp(Generic[EventType])
A background app.
__
init__
def __init__(config: BackgroundAppConfig) -> None
Initialize the BackgroundApp.
__
eq__
def __eq__(other: Any) -> bool
Custom equality comparing operator.
__
hash__
def __hash__() -> int
Custom hashing operator
specify_
type
def specify_type() -> BackgroundAppType
Specify the type of the background app.
setup
def setup(initial_synchronized_data: BaseSynchronizedData,
context: SkillContext) -> None
Set up the background round.
background_
round
@property
def background_round() -> AbstractRound
Get the background round.
process_
transaction
def process_transaction(transaction: Transaction, dry: bool = False) -> bool
Process a transaction.
TransitionBackup Objects
@dataclass
class TransitionBackup()
Holds transition related information as a backup in case we want to transition back from a background app.
AbciApp Objects
class AbciApp(Generic[EventType], ABC, metaclass=_MetaAbciApp)
Base class for ABCI apps.
Concrete classes of this class implement the ABCI App.
__
init__
def __init__(synchronized_data: BaseSynchronizedData, logger: logging.Logger,
context: SkillContext)
Initialize the AbciApp.
is_
abstract
@classmethod
def is_abstract(cls) -> bool
Return if the abci app is abstract.
add_
background_
app
@classmethod
def add_background_app(cls, config: BackgroundAppConfig) -> Type["AbciApp"]
Sets the background related class variables.
For a deeper understanding of the various types of background apps and how the inputs of this method influence
the generated background app's type, please refer to the BackgroundApp
class.
The specify_type
method provides further insight on the subject matter.
Arguments:
config
: the background app's configuration.
Returns:
the AbciApp
with the new background app contained in the background_apps
set.
synchronized_
data
@property
def synchronized_data() -> BaseSynchronizedData
Return the current synchronized data.
get_
all_
rounds
@classmethod
def get_all_rounds(cls) -> Set[AppState]
Get all the round states.
get_
all_
events
@classmethod
def get_all_events(cls) -> Set[EventType]
Get all the events.
get_
all_
round_
classes
@classmethod
def get_all_round_classes(
cls,
bg_round_cls: Set[Type[AbstractRound]],
include_background_rounds: bool = False) -> Set[AppState]
Get all round classes.
bg_
apps_
prioritized
@property
def bg_apps_prioritized() -> Tuple[List[BackgroundApp], ...]
Get the background apps grouped and prioritized by their types.
last_
timestamp
@property
def last_timestamp() -> datetime.datetime
Get last timestamp.
setup
def setup() -> None
Set up the behaviour.
schedule_
round
def schedule_round(round_cls: AppState) -> None
Schedule a round class.
this means: - cancel timeout events belonging to the current round; - instantiate the new round class and set it as current round; - create new timeout events and schedule them according to the latest timestamp.
Arguments:
round_cls
: the class of the new round.
current_
round
@property
def current_round() -> AbstractRound
Get the current round.
current_
round_
id
@property
def current_round_id() -> Optional[str]
Get the current round id.
current_
round_
height
@property
def current_round_height() -> int
Get the current round height.
last_
round_
id
@property
def last_round_id() -> Optional[str]
Get the last round id.
is_
finished
@property
def is_finished() -> bool
Check whether the AbciApp execution has finished.
latest_
result
@property
def latest_result() -> Optional[BaseSynchronizedData]
Get the latest result of the round.
cleanup_
timeouts
def cleanup_timeouts() -> None
Remove all timeouts.
Note that this is method is meant to be used only when performing recovery. Calling it in normal execution will result in unexpected behaviour.
check_
transaction
def check_transaction(transaction: Transaction) -> None
Check a transaction.
process_
transaction
def process_transaction(transaction: Transaction, dry: bool = False) -> None
Process a transaction.
The background rounds run concurrently with other (normal) rounds. First we check if the transaction is meant for a background round, if not we forward it to the current round object.
Arguments:
transaction
: the transaction.dry
: whether the transaction should only be checked and not processed.
process_
event
def process_event(event: EventType,
result: Optional[BaseSynchronizedData] = None) -> None
Process a round event.
update_
time
def update_time(timestamp: datetime.datetime) -> None
Observe timestamp from last block.
Arguments:
timestamp
: the latest block's timestamp.
cleanup
def cleanup(cleanup_history_depth: int,
cleanup_history_depth_current: Optional[int] = None) -> None
Clear data.
cleanup_
current_
histories
def cleanup_current_histories(cleanup_history_depth_current: int) -> None
Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.
OffenseType Objects
class OffenseType(Enum)
The types of offenses.
The values of the enum represent the seriousness of the offence.
Offense types with values >1000 are considered serious.
See also is_light_offence
and is_serious_offence
functions.
is_
light_
offence
def is_light_offence(offence_type: OffenseType) -> bool
Check if an offence type is light.
is_
serious_
offence
def is_serious_offence(offence_type: OffenseType) -> bool
Check if an offence type is serious.
light_
offences
def light_offences() -> Iterator[OffenseType]
Get the light offences.
serious_
offences
def serious_offences() -> Iterator[OffenseType]
Get the serious offences.
AvailabilityWindow Objects
class AvailabilityWindow()
A cyclic array with a maximum length that holds boolean values.
When an element is added to the array and the maximum length has been reached,
the oldest element is removed. Two attributes num_positive
and num_negative
reflect the number of positive and negative elements in the AvailabilityWindow,
they are updated every time a new element is added.
__
init__
def __init__(max_length: int) -> None
Initializes the AvailabilityWindow
instance.
Arguments:
max_length
: the maximum length of the cyclic array.
__
eq__
def __eq__(other: Any) -> bool
Compare AvailabilityWindow
objects.
has_
bad_
availability_
rate
def has_bad_availability_rate(threshold: float = 0.95) -> bool
Whether the agent on which the window belongs to has a bad availability rate or not.
add
def add(value: bool) -> None
Adds a new boolean value to the cyclic array.
If the maximum length has been reached, the oldest element is removed.
Arguments:
value
: The boolean value to add to the cyclic array.
to_
dict
def to_dict() -> Dict[str, int]
Returns a dictionary representation of the AvailabilityWindow
instance.
from_
dict
@classmethod
def from_dict(cls, data: Dict[str, int]) -> "AvailabilityWindow"
Initializes an AvailabilityWindow
instance from a dictionary.
OffenceStatus Objects
@dataclass
class OffenceStatus()
A class that holds information about offence status for an agent.
slash_
amount
def slash_amount(light_unit_amount: int, serious_unit_amount: int) -> int
Get the slash amount of the current status.
OffenseStatusEncoder Objects
class OffenseStatusEncoder(json.JSONEncoder)
A custom JSON encoder for the offence status dictionary.
default
def default(o: Any) -> Any
The default JSON encoder.
OffenseStatusDecoder Objects
class OffenseStatusDecoder(json.JSONDecoder)
A custom JSON decoder for the offence status dictionary.
__
init__
def __init__(*args: Any, **kwargs: Any) -> None
Initialize the custom JSON decoder.
hook
@staticmethod
def hook(
data: Dict[str, Any]
) -> Union[AvailabilityWindow, OffenceStatus, Dict[str, OffenceStatus]]
Perform the custom decoding.
PendingOffense Objects
@dataclass(frozen=True, eq=True)
class PendingOffense()
A dataclass to represent offences that need to be addressed.
__
post_
init__
def __post_init__() -> None
Post initialization for offence type conversion in case it is given as an int
.
SlashingNotConfiguredError Objects
class SlashingNotConfiguredError(Exception)
Custom exception raised when slashing configuration is requested but is not available.
DEFAULT_
PENDING_
OFFENCE_
TTL
1 hour
RoundSequence Objects
class RoundSequence()
This class represents a sequence of rounds
It is a generic class that keeps track of the current round of the consensus period. It receives 'deliver_tx' requests from the ABCI handlers and forwards them to the current active round instance, which implements the ABCI app logic. It also schedules the next round (if any) whenever a round terminates.
__
init__
def __init__(context: SkillContext, abci_app_cls: Type[AbciApp])
Initialize the round.
enable_
slashing
def enable_slashing() -> None
Enable slashing.
validator_
to_
agent
@property
def validator_to_agent() -> Dict[str, str]
Get the mapping of the validators' addresses to their agent addresses.
validator_
to_
agent
@validator_to_agent.setter
def validator_to_agent(validator_to_agent: Dict[str, str]) -> None
Set the mapping of the validators' addresses to their agent addresses.
offence_
status
@property
def offence_status() -> Dict[str, OffenceStatus]
Get the mapping of the agents' addresses to their offence status.
offence_
status
@offence_status.setter
def offence_status(offence_status: Dict[str, OffenceStatus]) -> None
Set the mapping of the agents' addresses to their offence status.
add_
pending_
offence
def add_pending_offence(pending_offence: PendingOffense) -> None
Add a pending offence to the set of pending offences.
Pending offences are offences that have been detected, but not yet agreed upon by the consensus. A pending offence is removed from the set of pending offences and added to the OffenceStatus of a validator when the majority of the agents agree on it.
Arguments:
pending_offence
: the pending offence to add
Returns:
None
sync_
db_
and_
slashing
def sync_db_and_slashing(serialized_db_state: str) -> None
Sync the database and the slashing configuration.
serialized_
offence_
status
def serialized_offence_status() -> str
Serialize the offence status.
store_
offence_
status
def store_offence_status() -> None
Store the serialized offence status.
get_
agent_
address
def get_agent_address(validator: Validator) -> str
Get corresponding agent address from a Validator
instance.
setup
def setup(*args: Any, **kwargs: Any) -> None
Set up the round sequence.
Arguments:
args
: the arguments to pass to the round constructor.kwargs
: the keyword-arguments to pass to the round constructor.
start_
sync
def start_sync() -> None
Set _syncing_up
flag to true.
if the _syncing_up flag is set to true, the async_act
method won't be executed. For more details refer to
https://github.com/valory-xyz/open-autonomy/issues/247#issuecomment-1012268656
end_
sync
def end_sync() -> None
Set _syncing_up
flag to false.
syncing_
up
@property
def syncing_up() -> bool
Return if the app is in sync mode.
abci_
app
@property
def abci_app() -> AbciApp
Get the AbciApp.
blockchain
@property
def blockchain() -> Blockchain
Get the Blockchain instance.
blockchain
@blockchain.setter
def blockchain(_blockchain: Blockchain) -> None
Get the Blockchain instance.
height
@property
def height() -> int
Get the height.
is_
finished
@property
def is_finished() -> bool
Check if a round sequence has finished.
check_
is_
finished
def check_is_finished() -> None
Check if a round sequence has finished.
current_
round
@property
def current_round() -> AbstractRound
Get current round.
current_
round_
id
@property
def current_round_id() -> Optional[str]
Get the current round id.
current_
round_
height
@property
def current_round_height() -> int
Get the current round height.
last_
round_
id
@property
def last_round_id() -> Optional[str]
Get the last round id.
last_
timestamp
@property
def last_timestamp() -> datetime.datetime
Get the last timestamp.
last_
round_
transition_
timestamp
@property
def last_round_transition_timestamp() -> datetime.datetime
Returns the timestamp for last round transition.
last_
round_
transition_
height
@property
def last_round_transition_height() -> int
Returns the height for last round transition.
last_
round_
transition_
root_
hash
@property
def last_round_transition_root_hash() -> bytes
Returns the root hash for last round transition.
last_
round_
transition_
tm_
height
@property
def last_round_transition_tm_height() -> int
Returns the Tendermint height for last round transition.
latest_
synchronized_
data
@property
def latest_synchronized_data() -> BaseSynchronizedData
Get the latest synchronized_data.
root_
hash
@property
def root_hash() -> bytes
Get the Merkle root hash of the application state.
This is going to be the database's hash. In this way, the app hash will be reflecting our application's state, and will guarantee that all the agents on the chain apply the changes of the arriving blocks in the same way.
Returns:
the root hash to be included as the Header.AppHash in the next block.
tm_
height
@property
def tm_height() -> int
Get Tendermint's current height.
tm_
height
@tm_height.setter
def tm_height(_tm_height: int) -> None
Set Tendermint's current height.
block_
stall_
deadline_
expired
@property
def block_stall_deadline_expired() -> bool
Get if the deadline for not having received any begin block requests from the Tendermint node has expired.
set_
block_
stall_
deadline
def set_block_stall_deadline() -> None
Use the local time of the agent and a predefined tolerance, to specify the expiration of the deadline.
init_
chain
def init_chain(initial_height: int) -> None
Init chain.
begin_
block
def begin_block(header: Header, evidences: Evidences,
last_commit_info: LastCommitInfo) -> None
Begin block.
deliver_
tx
def deliver_tx(transaction: Transaction) -> None
Deliver a transaction.
Appends the transaction to build the block on 'end_block' later.
Arguments:
transaction
: the transaction.
Raises:
None
: an Error otherwise.
end_
block
def end_block() -> None
Process the 'end_block' request.
commit
def commit() -> None
Process the 'commit' request.
reset_
blockchain
def reset_blockchain(is_replay: bool = False, is_init: bool = False) -> None
Reset blockchain after tendermint reset.
Arguments:
is_replay
: whether we are resetting the blockchain while replaying blocks.is_init
: whether to process blocks before receiving an init_chain req from tendermint.
reset_
state
def reset_state(restart_from_round: str,
round_count: int,
serialized_db_state: Optional[str] = None) -> None
This method resets the state of RoundSequence to the beginning of the period.
Note: This is intended to be used for agent <-> tendermint communication recovery only!
Arguments:
restart_from_round
: from which round to restart the abci. This round should be the first round in the last period.round_count
: the round count at the beginning of the period -1.serialized_db_state
: the state of the database at the beginning of the period. If provided, the database will be reset to this state.
PendingOffencesPayload Objects
@dataclass(frozen=True)
class PendingOffencesPayload(BaseTxPayload)
Represent a transaction payload for pending offences.
PendingOffencesRound Objects
class PendingOffencesRound(CollectSameUntilThresholdRound)
Defines the pending offences background round, which runs concurrently with other rounds to sync the offences.
__
init__
def __init__(*args: Any, **kwargs: Any) -> None
Initialize the PendingOffencesRound
.
offence_
status
@property
def offence_status() -> Dict[str, OffenceStatus]
Get the offence status from the round sequence.
end_
block
def end_block() -> None
Process the end of the block for the pending offences background round.
It is important to note that this is a non-standard type of round, meaning it does not emit any events. Instead, it continuously runs in the background. The objective of this round is to consistently monitor the received pending offences and achieve a consensus among the agents.