Skip to content

packages.valory.skills.abstract_round_abci.base

This module contains the base classes for the models classes of the skill.

get_name

def get_name(prop: Any) -> str

Get the name of a property.

ABCIAppException Objects

class ABCIAppException(Exception)

A parent class for all exceptions related to the ABCIApp.

SignatureNotValidError Objects

class SignatureNotValidError(ABCIAppException)

Error raised when a signature is invalid.

AddBlockError Objects

class AddBlockError(ABCIAppException)

Exception raised when a block addition is not valid.

ABCIAppInternalError Objects

class ABCIAppInternalError(ABCIAppException)

Internal error due to a bad implementation of the ABCIApp.

__init__

def __init__(message: str, *args: Any) -> None

Initialize the error object.

TransactionTypeNotRecognizedError Objects

class TransactionTypeNotRecognizedError(ABCIAppException)

Error raised when a transaction type is not recognized.

TransactionNotValidError Objects

class TransactionNotValidError(ABCIAppException)

Error raised when a transaction is not valid.

LateArrivingTransaction Objects

class LateArrivingTransaction(ABCIAppException)

Error raised when the transaction belongs to previous round.

AbstractRoundInternalError Objects

class AbstractRoundInternalError(ABCIAppException)

Internal error due to a bad implementation of the AbstractRound.

__init__

def __init__(message: str, *args: Any) -> None

Initialize the error object.

_MetaPayload Objects

class _MetaPayload(ABCMeta)

Payload metaclass.

The purpose of this metaclass is to remember the association between the type of payload and the payload class to build it. This is necessary to recover the right payload class to instantiate at decoding time.

__new__

def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
            **kwargs: Any) -> Type

Create a new class object.

BaseTxPayload Objects

@dataclass(frozen=True)
class BaseTxPayload(metaclass=_MetaPayload)

This class represents a base class for transaction payload classes.

data

@property
def data() -> Dict[str, Any]

Data

values

@property
def values() -> Tuple[Any, ...]

Data

json

@property
def json() -> Dict[str, Any]

Json

from_json

@classmethod
def from_json(cls, obj: Dict) -> "BaseTxPayload"

Decode the payload.

with_new_id

def with_new_id() -> "BaseTxPayload"

Create a new payload with the same content but new id.

encode

def encode() -> bytes

Encode

decode

@classmethod
def decode(cls, obj: bytes) -> "BaseTxPayload"

Decode

Transaction Objects

@dataclass(frozen=True)
class Transaction(ABC)

Class to represent a transaction for the ephemeral chain of a period.

encode

def encode() -> bytes

Encode the transaction.

decode

@classmethod
def decode(cls, obj: bytes) -> "Transaction"

Decode the transaction.

verify

def verify(ledger_id: str) -> None

Verify the signature is correct.

Arguments:

  • ledger_id: the ledger id of the address

Raises:

  • None: SignatureNotValidError: if the signature is not valid.

Block Objects

class Block()

Class to represent (a subset of) data of a Tendermint block.

__init__

def __init__(header: Header, transactions: Sequence[Transaction]) -> None

Initialize the block.

transactions

@property
def transactions() -> Tuple[Transaction, ...]

Get the transactions.

timestamp

@property
def timestamp() -> datetime.datetime

Get the block timestamp.

Blockchain Objects

class Blockchain()

Class to represent a (naive) Tendermint blockchain.

The consistency of the data in the blocks is guaranteed by Tendermint.

__init__

def __init__(height_offset: int = 0, is_init: bool = True) -> None

Initialize the blockchain.

is_init

@property
def is_init() -> bool

Returns true if the blockchain is initialized.

add_block

def add_block(block: Block) -> None

Add a block to the list.

height

@property
def height() -> int

Get the height.

Tendermint's height starts from 1. A return value equal to 0 means empty blockchain.

Returns:

the height.

length

@property
def length() -> int

Get the blockchain length.

blocks

@property
def blocks() -> Tuple[Block, ...]

Get the blocks.

last_block

@property
def last_block() -> Block

Returns the last stored block.

BlockBuilder Objects

class BlockBuilder()

Helper class to build a block.

__init__

def __init__() -> None

Initialize the block builder.

reset

def reset() -> None

Reset the temporary data structures.

@property
def header() -> Header

Get the block header.

Returns:

the block header

header

@header.setter
def header(header: Header) -> None

Set the header.

transactions

@property
def transactions() -> Tuple[Transaction, ...]

Get the sequence of transactions.

add_transaction

def add_transaction(transaction: Transaction) -> None

Add a transaction.

get_block

def get_block() -> Block

Get the block.

AbciAppDB Objects

class AbciAppDB()

Class to represent all data replicated across agents.

This class stores all the data in self._data. Every entry on this dict represents an optional "period" within your app execution. The concept of period is user-defined, so it might be something like a sequence of rounds that together conform a logical cycle of its execution, or it might have no sense at all (thus its optionality) and therefore only period 0 will be used.

Every "period" entry stores a dict where every key is a saved parameter and its corresponding value a list containing the history of the parameter values. For instance, for period 0:

0: {"parameter_name": [parameter_history]}

A complete database could look like this:

data = { 0: { "participants": [ {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ] }, "other_parameter": [0, 2, 8] }, 1: { "participants": [ {"participant_a", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ], "other_parameter": [3, 19, 10, 32, 6] }, 2: ... }

Adding and removing data from the current period


To update the current period entry, just call update() on the class. The new values will be appended to the current list for each updated parameter.

To clean up old data from the current period entry, call cleanup_current_histories(cleanup_history_depth_current), where cleanup_history_depth_current is the amount of data that you want to keep after the cleanup. The newest cleanup_history_depth_current values will be kept for each parameter in the DB.

Creating and removing old periods


To create a new period entry, call create() on the class. The new values will be stored in a new list for each updated parameter.

To remove old periods, call cleanup(cleanup_history_depth, [cleanup_history_depth_current]), where cleanup_history_depth is the amount of periods that you want to keep after the cleanup. The newest cleanup_history_depth periods will be kept. If you also specify cleanup_history_depth_current, cleanup_current_histories will be also called (see previous point).

The parameters cleanup_history_depth and cleanup_history_depth_current can also be configured in skill.yaml so they are used automatically when the cleanup method is called from AbciApp.cleanup().

Memory warning


The database is implemented in such a way to avoid indirect modification of its contents. It copies all the mutable data structures*, which means that it consumes more memory than expected. This is necessary because otherwise it would risk chance of modification from the behaviour side, which is a safety concern.

The effect of this on the memory usage should not be a big concern, because:

1. The synchronized data of the agents are not intended to store large amount of data.
 IPFS should be used in such cases, and only the hash should be synchronized in the db.
2. The data are automatically wiped after a predefined `cleanup_history` depth as described above.
3. The retrieved data are only meant to be used for a short amount of time,
 e.g., to perform a decision on a behaviour, which means that the gc will collect them before they are noticed.
  • the in-built copy module is used, which automatically detects if an item is immutable and skips copying it. For more information take a look at the _deepcopy_atomic method and its usage: https://github.com/python/cpython/blob/3.10/Lib/copy.py#L182-L183

__init__

def __init__(setup_data: Dict[str, List[Any]],
             cross_period_persisted_keys: Optional[FrozenSet[str]] = None,
             logger: Optional[logging.Logger] = None) -> None

Initialize the AbciApp database.

setup_data must be passed as a Dict[str, List[Any]] (the database internal format). The staticmethod 'data_to_lists' can be used to convert from Dict[str, Any] to Dict[str, List[Any]] before instantiating this class.

Arguments:

  • setup_data: the setup data
  • cross_period_persisted_keys: data keys that will be kept after a new period starts
  • logger: the logger of the abci app

setup_data

@property
def setup_data() -> Dict[str, Any]

Get the setup_data without entries which have empty values.

Returns:

the setup_data

reset_index

@property
def reset_index() -> int

Get the current reset index.

round_count

@property
def round_count() -> int

Get the round count.

round_count

@round_count.setter
def round_count(round_count: int) -> None

Set the round count.

cross_period_persisted_keys

@property
def cross_period_persisted_keys() -> FrozenSet[str]

Keys in the database which are persistent across periods.

get

def get(key: str, default: Any = VALUE_NOT_PROVIDED) -> Optional[Any]

Given a key, get its last for the current reset index.

get_strict

def get_strict(key: str) -> Any

Get a value from the data dictionary and raise if it is None.

validate

@staticmethod
def validate(data: Any) -> None

Validate if the given data are json serializable and therefore can be accepted into the database.

Arguments:

  • data: the data to check.

Raises:

  • ABCIAppInternalError: If the data are not serializable.

update

def update(**kwargs: Any) -> None

Update the current data.

create

def create(**kwargs: Any) -> None

Add a new entry to the data.

Passes automatically the values of the cross_period_persisted_keys to the next period.

Arguments:

  • kwargs: keyword arguments

get_latest_from_reset_index

def get_latest_from_reset_index(reset_index: int) -> Dict[str, Any]

Get the latest key-value pairs from the data dictionary for the specified period.

get_latest

def get_latest() -> Dict[str, Any]

Get the latest key-value pairs from the data dictionary for the current period.

increment_round_count

def increment_round_count() -> None

Increment the round count.

__repr__

def __repr__() -> str

Return a string representation of the data.

cleanup

def cleanup(cleanup_history_depth: int,
            cleanup_history_depth_current: Optional[int] = None) -> None

Reset the db, keeping only the latest entries (periods).

If cleanup_history_depth_current has been also set, also clear oldest historic values in the current entry.

Arguments:

  • cleanup_history_depth: depth to clean up history
  • cleanup_history_depth_current: whether or not to clean up current entry too.

cleanup_current_histories

def cleanup_current_histories(cleanup_history_depth_current: int) -> None

Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.

serialize

def serialize() -> str

Serialize the data of the database to a string.

sync

def sync(serialized_data: str) -> None

Synchronize the data using a serialized object.

Arguments:

  • serialized_data: the serialized data to use in order to sync the db.

Raises:

  • ABCIAppInternalError: if the given data cannot be deserialized.

hash

def hash() -> bytes

Create a hash of the data.

data_to_lists

@staticmethod
def data_to_lists(data: Dict[str, Any]) -> Dict[str, List[Any]]

Convert Dict[str, Any] to Dict[str, List[Any]].

BaseSynchronizedData Objects

class BaseSynchronizedData()

Class to represent the synchronized data.

This is the relevant data constructed and replicated by the agents.

__init__

def __init__(db: AbciAppDB) -> None

Initialize the synchronized data.

db

@property
def db() -> AbciAppDB

Get DB.

round_count

@property
def round_count() -> int

Get the round count.

period_count

@property
def period_count() -> int

Get the period count.

Periods are executions between calls to AbciAppDB.create(), so as soon as it is called, a new period begins. It is useful to have a logical subdivision of the FSM execution. For example, if AbciAppDB.create() is called during reset, then a period will be the execution between resets.

Returns:

the period count

participants

@property
def participants() -> FrozenSet[str]

Get the currently active participants.

all_participants

@property
def all_participants() -> FrozenSet[str]

Get all registered participants.

max_participants

@property
def max_participants() -> int

Get the number of all the participants.

consensus_threshold

@property
def consensus_threshold() -> int

Get the consensus threshold.

sorted_participants

@property
def sorted_participants() -> Sequence[str]

Get the sorted participants' addresses.

The addresses are sorted according to their hexadecimal value; this is the reason we use key=str.lower as comparator.

This property is useful when interacting with the Safe contract.

Returns:

the sorted participants' addresses

nb_participants

@property
def nb_participants() -> int

Get the number of participants.

slashing_config

@property
def slashing_config() -> str

Get the slashing configuration.

slashing_config

@slashing_config.setter
def slashing_config(config: str) -> None

Set the slashing configuration.

update

def update(synchronized_data_class: Optional[Type] = None,
           **kwargs: Any) -> "BaseSynchronizedData"

Copy and update the current data.

create

def create(
        synchronized_data_class: Optional[Type] = None
) -> "BaseSynchronizedData"

Copy and update with new data. Set values are stored as sorted tuples to the db for determinism.

__repr__

def __repr__() -> str

Return a string representation of the data.

keeper_randomness

@property
def keeper_randomness() -> float

Get the keeper's random number [0-1].

most_voted_randomness

@property
def most_voted_randomness() -> str

Get the most_voted_randomness.

most_voted_keeper_address

@property
def most_voted_keeper_address() -> str

Get the most_voted_keeper_address.

is_keeper_set

@property
def is_keeper_set() -> bool

Check whether keeper is set.

blacklisted_keepers

@property
def blacklisted_keepers() -> Set[str]

Get the current cycle's blacklisted keepers who cannot submit a transaction.

participant_to_selection

@property
def participant_to_selection() -> DeserializedCollection

Check whether keeper is set.

participant_to_randomness

@property
def participant_to_randomness() -> DeserializedCollection

Check whether keeper is set.

participant_to_votes

@property
def participant_to_votes() -> DeserializedCollection

Check whether keeper is set.

safe_contract_address

@property
def safe_contract_address() -> str

Get the safe contract address.

_MetaAbstractRound Objects

class _MetaAbstractRound(ABCMeta)

A metaclass that validates AbstractRound's attributes.

__new__

def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
            **kwargs: Any) -> Type

Initialize the class.

AbstractRound Objects

class AbstractRound(Generic[EventType], ABC, metaclass=_MetaAbstractRound)

This class represents an abstract round.

A round is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.

Concrete classes must set: - synchronized_data_class: the data class associated with this round; - payload_class: the payload type that is allowed for this round;

Optionally, round_id can be defined, although it is recommended to use the autogenerated id.

__init__

def __init__(
    synchronized_data: BaseSynchronizedData,
    context: SkillContext,
    previous_round_payload_class: Optional[Type[BaseTxPayload]] = None
) -> None

Initialize the round.

auto_round_id

@classmethod
def auto_round_id(cls) -> str

Get round id automatically.

This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.

round_id

@property
def round_id() -> str

Get round id.

synchronized_data

@property
def synchronized_data() -> BaseSynchronizedData

Get the synchronized data.

check_transaction

def check_transaction(transaction: Transaction) -> None

Check transaction against the current state.

Arguments:

  • transaction: the transaction

process_transaction

def process_transaction(transaction: Transaction) -> None

Process a transaction.

By convention, the payload handler should be a method of the class that is named '{payload_name}'.

Arguments:

  • transaction: the transaction.

end_block

@abstractmethod
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

The role of this method is check whether the round is considered ended.

If the round is ended, the return value is - the final result of the round. - the event that triggers a transition. If None, the period in which the round was executed is considered ended.

This is done after each block because we consider the consensus engine's block, and not the transaction, as the smallest unit on which the consensus is reached; in other words, each read operation on the state should be done only after each block, and not after each transaction.

check_payload_type

def check_payload_type(transaction: Transaction) -> None

Check the transaction is of the allowed transaction type.

Arguments:

  • transaction: the transaction

Raises:

  • None: TransactionTypeNotRecognizedError if the transaction can be applied to the current state.

check_majority_possible_with_new_voter

def check_majority_possible_with_new_voter(
        votes_by_participant: Dict[str, BaseTxPayload],
        new_voter: str,
        new_vote: BaseTxPayload,
        nb_participants: int,
        exception_cls: Type[ABCIAppException] = ABCIAppException) -> None

Check that a Byzantine majority is achievable, once a new vote is added.

Arguments:

  • votes_by_participant: a mapping from a participant to its vote, before the new vote is added
  • new_voter: the new voter
  • new_vote: the new vote
  • nb_participants: the total number of participants
  • exception_cls: the class of the exception to raise in case the check fails.

Raises:

  • None: exception_cls: in case the check does not pass.

check_majority_possible

def check_majority_possible(
        votes_by_participant: Dict[str, BaseTxPayload],
        nb_participants: int,
        exception_cls: Type[ABCIAppException] = ABCIAppException) -> None

Check that a Byzantine majority is still achievable.

The idea is that, even if all the votes have not been delivered yet, it can be deduced whether a quorum cannot be reached due to divergent preferences among the voters and due to a too small number of other participants whose vote has not been delivered yet.

The check fails iff:

nb_remaining_votes + largest_nb_votes < quorum

That is, if the number of remaining votes is not enough to make the most voted item so far to exceed the quorum.

Preconditions on the input: - the size of votes_by_participant should not be greater than "nb_participants - 1" voters - new voter must not be in the current votes_by_participant

Arguments:

  • votes_by_participant: a mapping from a participant to its vote
  • nb_participants: the total number of participants
  • exception_cls: the class of the exception to raise in case the check fails.

Raises:

  • exception_cls: in case the check does not pass.

is_majority_possible

def is_majority_possible(votes_by_participant: Dict[str, BaseTxPayload],
                         nb_participants: int) -> bool

Return true if a Byzantine majority is achievable, false otherwise.

Arguments:

  • votes_by_participant: a mapping from a participant to its vote
  • nb_participants: the total number of participants

Returns:

True if the majority is still possible, false otherwise.

check_payload

@abstractmethod
def check_payload(payload: BaseTxPayload) -> None

Check payload.

process_payload

@abstractmethod
def process_payload(payload: BaseTxPayload) -> None

Process payload.

DegenerateRound Objects

class DegenerateRound(AbstractRound, ABC)

This class represents the finished round during operation.

It is a sink round.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check payload.

process_payload

def process_payload(payload: BaseTxPayload) -> None

Process payload.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

End block.

CollectionRound Objects

class CollectionRound(AbstractRound, ABC)

CollectionRound.

This class represents abstract logic for collection based rounds where the round object needs to collect data from different agents. The data might for example be from a voting round or estimation round.

_allow_rejoin_payloads is used to allow agents not currently active to deliver a payload.

__init__

def __init__(*args: Any, **kwargs: Any)

Initialize the collection round.

serialize_collection

@staticmethod
def serialize_collection(
        collection: DeserializedCollection) -> SerializedCollection

Deserialize a serialized collection.

deserialize_collection

@staticmethod
def deserialize_collection(
        serialized: SerializedCollection) -> DeserializedCollection

Deserialize a serialized collection.

serialized_collection

@property
def serialized_collection() -> SerializedCollection

A collection with the addresses mapped to serialized payloads.

accepting_payloads_from

@property
def accepting_payloads_from() -> FrozenSet[str]

Accepting from the active set, or also from (re)joiners

payloads

@property
def payloads() -> List[BaseTxPayload]

Get all agent payloads

payload_values_count

@property
def payload_values_count() -> Counter

Get count of payload values.

process_payload

def process_payload(payload: BaseTxPayload) -> None

Process payload.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check Payload

_CollectUntilAllRound Objects

class _CollectUntilAllRound(CollectionRound, ABC)

_CollectUntilAllRound

This class represents abstract logic for when rounds need to collect payloads from all agents.

This round should only be used when non-BFT behaviour is acceptable.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check Payload

process_payload

def process_payload(payload: BaseTxPayload) -> None

Process payload.

collection_threshold_reached

@property
def collection_threshold_reached() -> bool

Check that the collection threshold has been reached.

CollectDifferentUntilAllRound Objects

class CollectDifferentUntilAllRound(_CollectUntilAllRound, ABC)

CollectDifferentUntilAllRound

This class represents logic for rounds where a round needs to collect different payloads from each agent.

This round should only be used for registration of new agents when there is synchronization of the db.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check Payload

CollectSameUntilAllRound Objects

class CollectSameUntilAllRound(_CollectUntilAllRound, ABC)

This class represents logic for when a round needs to collect the same payload from all the agents.

This round should only be used for registration of new agents when there is no synchronization of the db.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check Payload

common_payload

@property
def common_payload() -> Any

Get the common payload among the agents.

common_payload_values

@property
def common_payload_values() -> Tuple[Any, ...]

Get the common payload among the agents.

CollectSameUntilThresholdRound Objects

class CollectSameUntilThresholdRound(CollectionRound, ABC)

CollectSameUntilThresholdRound

This class represents logic for rounds where a round needs to collect same payload from k of n agents.

done_event is emitted when a) the collection threshold (k of n) is reached, and b) the most voted payload has non-empty attributes. In this case all payloads are saved under collection_key and the most voted payload attributes are saved under selection_key.

none_event is emitted when a) the collection threshold (k of n) is reached, and b) the most voted payload has only empty attributes.

no_majority_event is emitted when it is impossible to reach a k of n majority.

threshold_reached

@property
def threshold_reached() -> bool

Check if the threshold has been reached.

most_voted_payload

@property
def most_voted_payload() -> Any

Get the most voted payload value.

Kept for backward compatibility.

most_voted_payload_values

@property
def most_voted_payload_values() -> Tuple[Any, ...]

Get the most voted payload values.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

OnlyKeeperSendsRound Objects

class OnlyKeeperSendsRound(AbstractRound, ABC)

OnlyKeeperSendsRound

This class represents logic for rounds where only one agent sends a payload.

done_event is emitted when a) the keeper payload has been received and b) the keeper payload has non-empty attributes. In this case all attributes are saved under payload_key.

fail_event is emitted when a) the keeper payload has been received and b) the keeper payload has only empty attributes

process_payload

def process_payload(payload: BaseTxPayload) -> None

Handle a deploy safe payload.

check_payload

def check_payload(payload: BaseTxPayload) -> None

Check a deploy safe payload can be applied to the current state.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

VotingRound Objects

class VotingRound(CollectionRound, ABC)

VotingRound

This class represents logic for rounds where a round needs votes from agents. Votes are in the form of True (positive), False (negative) and None (abstain). The round ends when k of n agents make the same vote.

done_event is emitted when a) the collection threshold (k of n) is reached with k positive votes. In this case all payloads are saved under collection_key.

negative_event is emitted when a) the collection threshold (k of n) is reached with k negative votes.

none_event is emitted when a) the collection threshold (k of n) is reached with k abstain votes.

no_majority_event is emitted when it is impossible to reach a k of n majority for either of the options.

vote_count

@property
def vote_count() -> Counter

Get agent payload vote count

positive_vote_threshold_reached

@property
def positive_vote_threshold_reached() -> bool

Check that the vote threshold has been reached.

negative_vote_threshold_reached

@property
def negative_vote_threshold_reached() -> bool

Check that the vote threshold has been reached.

none_vote_threshold_reached

@property
def none_vote_threshold_reached() -> bool

Check that the vote threshold has been reached.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

CollectDifferentUntilThresholdRound Objects

class CollectDifferentUntilThresholdRound(CollectionRound, ABC)

CollectDifferentUntilThresholdRound

This class represents logic for rounds where a round needs to collect different payloads from k of n agents.

done_event is emitted when a) the required block confirmations have been met, and b) the collection threshold (k of n) is reached. In this case all payloads are saved under collection_key.

Extended required_block_confirmations to allow for arrival of more payloads.

collection_threshold_reached

@property
def collection_threshold_reached() -> bool

Check if the threshold has been reached.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

CollectNonEmptyUntilThresholdRound Objects

class CollectNonEmptyUntilThresholdRound(CollectDifferentUntilThresholdRound,
                                         ABC)

CollectNonEmptyUntilThresholdRound

This class represents logic for rounds where a round needs to collect optionally different payloads from k of n agents, where we only keep the non-empty attributes.

done_event is emitted when a) the required block confirmations have been met, b) the collection threshold (k of n) is reached, and c) some non-empty attribute values have been collected. In this case all payloads are saved under collection_key. Under selection_key the non-empty attribute values are stored.

none_event is emitted when a) the required block confirmations have been met, b) the collection threshold (k of n) is reached, and c) no non-empty attribute values have been collected.

Attention: A none_event might be triggered even though some of the remaining n-k agents might send non-empty attributes! Extended required_block_confirmations can alleviate this somewhat.

end_block

def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]

Process the end of the block.

TimeoutEvent Objects

@dataclass(order=True)
class TimeoutEvent(Generic[EventType])

Timeout event.

Timeouts Objects

class Timeouts(Generic[EventType])

Class to keep track of pending timeouts.

__init__

def __init__() -> None

Initialize.

size

@property
def size() -> int

Get the size of the timeout queue.

add_timeout

def add_timeout(deadline: datetime.datetime, event: EventType) -> int

Add a timeout.

cancel_timeout

def cancel_timeout(entry_count: int) -> None

Remove a timeout.

Arguments:

  • entry_count: the entry id to remove.

Raises:

  • None: KeyError: if the entry count is not found.

pop_earliest_cancelled_timeouts

def pop_earliest_cancelled_timeouts() -> None

Pop earliest cancelled timeouts.

get_earliest_timeout

def get_earliest_timeout() -> Tuple[datetime.datetime, Any]

Get the earliest timeout-event pair.

pop_timeout

def pop_timeout() -> Tuple[datetime.datetime, Any]

Remove and return the earliest timeout-event pair.

_MetaAbciApp Objects

class _MetaAbciApp(ABCMeta)

A metaclass that validates AbciApp's attributes.

__new__

def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
            **kwargs: Any) -> Type

Initialize the class.

BackgroundAppType Objects

class BackgroundAppType(Enum)

The type of a background app.

Please note that the values correspond to the priority in which the background apps should be processed when updating rounds.

correct_types

@staticmethod
def correct_types() -> Set[str]

Return the correct types only.

BackgroundAppConfig Objects

@dataclass(frozen=True)
class BackgroundAppConfig(Generic[EventType])

Necessary configuration for a background app.

For a deeper understanding of the various types of background apps and how the config influences the generated background app's type, please refer to the BackgroundApp class. The specify_type method provides further insight on the subject matter.

BackgroundApp Objects

class BackgroundApp(Generic[EventType])

A background app.

__init__

def __init__(config: BackgroundAppConfig) -> None

Initialize the BackgroundApp.

__eq__

def __eq__(other: Any) -> bool

Custom equality comparing operator.

__hash__

def __hash__() -> int

Custom hashing operator

specify_type

def specify_type() -> BackgroundAppType

Specify the type of the background app.

setup

def setup(initial_synchronized_data: BaseSynchronizedData,
          context: SkillContext) -> None

Set up the background round.

background_round

@property
def background_round() -> AbstractRound

Get the background round.

process_transaction

def process_transaction(transaction: Transaction, dry: bool = False) -> bool

Process a transaction.

TransitionBackup Objects

@dataclass
class TransitionBackup()

Holds transition related information as a backup in case we want to transition back from a background app.

AbciApp Objects

class AbciApp(Generic[EventType], ABC, metaclass=_MetaAbciApp)

Base class for ABCI apps.

Concrete classes of this class implement the ABCI App.

__init__

def __init__(synchronized_data: BaseSynchronizedData, logger: logging.Logger,
             context: SkillContext)

Initialize the AbciApp.

is_abstract

@classmethod
def is_abstract(cls) -> bool

Return if the abci app is abstract.

add_background_app

@classmethod
def add_background_app(cls, config: BackgroundAppConfig) -> Type["AbciApp"]

Sets the background related class variables.

For a deeper understanding of the various types of background apps and how the inputs of this method influence the generated background app's type, please refer to the BackgroundApp class. The specify_type method provides further insight on the subject matter.

Arguments:

  • config: the background app's configuration.

Returns:

the AbciApp with the new background app contained in the background_apps set.

synchronized_data

@property
def synchronized_data() -> BaseSynchronizedData

Return the current synchronized data.

get_all_rounds

@classmethod
def get_all_rounds(cls) -> Set[AppState]

Get all the round states.

get_all_events

@classmethod
def get_all_events(cls) -> Set[EventType]

Get all the events.

get_all_round_classes

@classmethod
def get_all_round_classes(
        cls,
        bg_round_cls: Set[Type[AbstractRound]],
        include_background_rounds: bool = False) -> Set[AppState]

Get all round classes.

bg_apps_prioritized

@property
def bg_apps_prioritized() -> Tuple[List[BackgroundApp], ...]

Get the background apps grouped and prioritized by their types.

last_timestamp

@property
def last_timestamp() -> datetime.datetime

Get last timestamp.

setup

def setup() -> None

Set up the behaviour.

schedule_round

def schedule_round(round_cls: AppState) -> None

Schedule a round class.

this means: - cancel timeout events belonging to the current round; - instantiate the new round class and set it as current round; - create new timeout events and schedule them according to the latest timestamp.

Arguments:

  • round_cls: the class of the new round.

current_round

@property
def current_round() -> AbstractRound

Get the current round.

current_round_id

@property
def current_round_id() -> Optional[str]

Get the current round id.

current_round_height

@property
def current_round_height() -> int

Get the current round height.

last_round_id

@property
def last_round_id() -> Optional[str]

Get the last round id.

is_finished

@property
def is_finished() -> bool

Check whether the AbciApp execution has finished.

latest_result

@property
def latest_result() -> Optional[BaseSynchronizedData]

Get the latest result of the round.

cleanup_timeouts

def cleanup_timeouts() -> None

Remove all timeouts.

Note that this is method is meant to be used only when performing recovery. Calling it in normal execution will result in unexpected behaviour.

check_transaction

def check_transaction(transaction: Transaction) -> None

Check a transaction.

process_transaction

def process_transaction(transaction: Transaction, dry: bool = False) -> None

Process a transaction.

The background rounds run concurrently with other (normal) rounds. First we check if the transaction is meant for a background round, if not we forward it to the current round object.

Arguments:

  • transaction: the transaction.
  • dry: whether the transaction should only be checked and not processed.

process_event

def process_event(event: EventType,
                  result: Optional[BaseSynchronizedData] = None) -> None

Process a round event.

update_time

def update_time(timestamp: datetime.datetime) -> None

Observe timestamp from last block.

Arguments:

  • timestamp: the latest block's timestamp.

cleanup

def cleanup(cleanup_history_depth: int,
            cleanup_history_depth_current: Optional[int] = None) -> None

Clear data.

cleanup_current_histories

def cleanup_current_histories(cleanup_history_depth_current: int) -> None

Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.

OffenseType Objects

class OffenseType(Enum)

The types of offenses.

The values of the enum represent the seriousness of the offence. Offense types with values >1000 are considered serious. See also is_light_offence and is_serious_offence functions.

is_light_offence

def is_light_offence(offence_type: OffenseType) -> bool

Check if an offence type is light.

is_serious_offence

def is_serious_offence(offence_type: OffenseType) -> bool

Check if an offence type is serious.

light_offences

def light_offences() -> Iterator[OffenseType]

Get the light offences.

serious_offences

def serious_offences() -> Iterator[OffenseType]

Get the serious offences.

AvailabilityWindow Objects

class AvailabilityWindow()

A cyclic array with a maximum length that holds boolean values.

When an element is added to the array and the maximum length has been reached, the oldest element is removed. Two attributes num_positive and num_negative reflect the number of positive and negative elements in the AvailabilityWindow, they are updated every time a new element is added.

__init__

def __init__(max_length: int) -> None

Initializes the AvailabilityWindow instance.

Arguments:

  • max_length: the maximum length of the cyclic array.

__eq__

def __eq__(other: Any) -> bool

Compare AvailabilityWindow objects.

has_bad_availability_rate

def has_bad_availability_rate(threshold: float = 0.95) -> bool

Whether the agent on which the window belongs to has a bad availability rate or not.

add

def add(value: bool) -> None

Adds a new boolean value to the cyclic array.

If the maximum length has been reached, the oldest element is removed.

Arguments:

  • value: The boolean value to add to the cyclic array.

to_dict

def to_dict() -> Dict[str, int]

Returns a dictionary representation of the AvailabilityWindow instance.

from_dict

@classmethod
def from_dict(cls, data: Dict[str, int]) -> "AvailabilityWindow"

Initializes an AvailabilityWindow instance from a dictionary.

OffenceStatus Objects

@dataclass
class OffenceStatus()

A class that holds information about offence status for an agent.

slash_amount

def slash_amount(light_unit_amount: int, serious_unit_amount: int) -> int

Get the slash amount of the current status.

OffenseStatusEncoder Objects

class OffenseStatusEncoder(json.JSONEncoder)

A custom JSON encoder for the offence status dictionary.

default

def default(o: Any) -> Any

The default JSON encoder.

OffenseStatusDecoder Objects

class OffenseStatusDecoder(json.JSONDecoder)

A custom JSON decoder for the offence status dictionary.

__init__

def __init__(*args: Any, **kwargs: Any) -> None

Initialize the custom JSON decoder.

hook

@staticmethod
def hook(
    data: Dict[str, Any]
) -> Union[AvailabilityWindow, OffenceStatus, Dict[str, OffenceStatus]]

Perform the custom decoding.

PendingOffense Objects

@dataclass(frozen=True, eq=True)
class PendingOffense()

A dataclass to represent offences that need to be addressed.

__post_init__

def __post_init__() -> None

Post initialization for offence type conversion in case it is given as an int.

SlashingNotConfiguredError Objects

class SlashingNotConfiguredError(Exception)

Custom exception raised when slashing configuration is requested but is not available.

DEFAULT_PENDING_OFFENCE_TTL

1 hour

RoundSequence Objects

class RoundSequence()

This class represents a sequence of rounds

It is a generic class that keeps track of the current round of the consensus period. It receives 'deliver_tx' requests from the ABCI handlers and forwards them to the current active round instance, which implements the ABCI app logic. It also schedules the next round (if any) whenever a round terminates.

__init__

def __init__(context: SkillContext, abci_app_cls: Type[AbciApp])

Initialize the round.

enable_slashing

def enable_slashing() -> None

Enable slashing.

validator_to_agent

@property
def validator_to_agent() -> Dict[str, str]

Get the mapping of the validators' addresses to their agent addresses.

validator_to_agent

@validator_to_agent.setter
def validator_to_agent(validator_to_agent: Dict[str, str]) -> None

Set the mapping of the validators' addresses to their agent addresses.

offence_status

@property
def offence_status() -> Dict[str, OffenceStatus]

Get the mapping of the agents' addresses to their offence status.

offence_status

@offence_status.setter
def offence_status(offence_status: Dict[str, OffenceStatus]) -> None

Set the mapping of the agents' addresses to their offence status.

add_pending_offence

def add_pending_offence(pending_offence: PendingOffense) -> None

Add a pending offence to the set of pending offences.

Pending offences are offences that have been detected, but not yet agreed upon by the consensus. A pending offence is removed from the set of pending offences and added to the OffenceStatus of a validator when the majority of the agents agree on it.

Arguments:

  • pending_offence: the pending offence to add

Returns:

None

sync_db_and_slashing

def sync_db_and_slashing(serialized_db_state: str) -> None

Sync the database and the slashing configuration.

serialized_offence_status

def serialized_offence_status() -> str

Serialize the offence status.

store_offence_status

def store_offence_status() -> None

Store the serialized offence status.

get_agent_address

def get_agent_address(validator: Validator) -> str

Get corresponding agent address from a Validator instance.

setup

def setup(*args: Any, **kwargs: Any) -> None

Set up the round sequence.

Arguments:

  • args: the arguments to pass to the round constructor.
  • kwargs: the keyword-arguments to pass to the round constructor.

start_sync

def start_sync() -> None

Set _syncing_up flag to true.

if the _syncing_up flag is set to true, the async_act method won't be executed. For more details refer to https://github.com/valory-xyz/open-autonomy/issues/247#issuecomment-1012268656

end_sync

def end_sync() -> None

Set _syncing_up flag to false.

syncing_up

@property
def syncing_up() -> bool

Return if the app is in sync mode.

abci_app

@property
def abci_app() -> AbciApp

Get the AbciApp.

blockchain

@property
def blockchain() -> Blockchain

Get the Blockchain instance.

blockchain

@blockchain.setter
def blockchain(_blockchain: Blockchain) -> None

Get the Blockchain instance.

height

@property
def height() -> int

Get the height.

is_finished

@property
def is_finished() -> bool

Check if a round sequence has finished.

check_is_finished

def check_is_finished() -> None

Check if a round sequence has finished.

current_round

@property
def current_round() -> AbstractRound

Get current round.

current_round_id

@property
def current_round_id() -> Optional[str]

Get the current round id.

current_round_height

@property
def current_round_height() -> int

Get the current round height.

last_round_id

@property
def last_round_id() -> Optional[str]

Get the last round id.

last_timestamp

@property
def last_timestamp() -> datetime.datetime

Get the last timestamp.

last_round_transition_timestamp

@property
def last_round_transition_timestamp() -> datetime.datetime

Returns the timestamp for last round transition.

last_round_transition_height

@property
def last_round_transition_height() -> int

Returns the height for last round transition.

last_round_transition_root_hash

@property
def last_round_transition_root_hash() -> bytes

Returns the root hash for last round transition.

last_round_transition_tm_height

@property
def last_round_transition_tm_height() -> int

Returns the Tendermint height for last round transition.

latest_synchronized_data

@property
def latest_synchronized_data() -> BaseSynchronizedData

Get the latest synchronized_data.

root_hash

@property
def root_hash() -> bytes

Get the Merkle root hash of the application state.

This is going to be the database's hash. In this way, the app hash will be reflecting our application's state, and will guarantee that all the agents on the chain apply the changes of the arriving blocks in the same way.

Returns:

the root hash to be included as the Header.AppHash in the next block.

tm_height

@property
def tm_height() -> int

Get Tendermint's current height.

tm_height

@tm_height.setter
def tm_height(_tm_height: int) -> None

Set Tendermint's current height.

block_stall_deadline_expired

@property
def block_stall_deadline_expired() -> bool

Get if the deadline for not having received any begin block requests from the Tendermint node has expired.

set_block_stall_deadline

def set_block_stall_deadline() -> None

Use the local time of the agent and a predefined tolerance, to specify the expiration of the deadline.

init_chain

def init_chain(initial_height: int) -> None

Init chain.

begin_block

def begin_block(header: Header, evidences: Evidences,
                last_commit_info: LastCommitInfo) -> None

Begin block.

deliver_tx

def deliver_tx(transaction: Transaction) -> None

Deliver a transaction.

Appends the transaction to build the block on 'end_block' later.

Arguments:

  • transaction: the transaction.

Raises:

  • None: an Error otherwise.

end_block

def end_block() -> None

Process the 'end_block' request.

commit

def commit() -> None

Process the 'commit' request.

reset_blockchain

def reset_blockchain(is_replay: bool = False, is_init: bool = False) -> None

Reset blockchain after tendermint reset.

Arguments:

  • is_replay: whether we are resetting the blockchain while replaying blocks.
  • is_init: whether to process blocks before receiving an init_chain req from tendermint.

reset_state

def reset_state(restart_from_round: str,
                round_count: int,
                serialized_db_state: Optional[str] = None) -> None

This method resets the state of RoundSequence to the beginning of the period.

Note: This is intended to be used for agent <-> tendermint communication recovery only!

Arguments:

  • restart_from_round: from which round to restart the abci. This round should be the first round in the last period.
  • round_count: the round count at the beginning of the period -1.
  • serialized_db_state: the state of the database at the beginning of the period. If provided, the database will be reset to this state.

PendingOffencesPayload Objects

@dataclass(frozen=True)
class PendingOffencesPayload(BaseTxPayload)

Represent a transaction payload for pending offences.

PendingOffencesRound Objects

class PendingOffencesRound(CollectSameUntilThresholdRound)

Defines the pending offences background round, which runs concurrently with other rounds to sync the offences.

__init__

def __init__(*args: Any, **kwargs: Any) -> None

Initialize the PendingOffencesRound.

offence_status

@property
def offence_status() -> Dict[str, OffenceStatus]

Get the offence status from the round sequence.

end_block

def end_block() -> None

Process the end of the block for the pending offences background round.

It is important to note that this is a non-standard type of round, meaning it does not emit any events. Instead, it continuously runs in the background. The objective of this round is to consistently monitor the received pending offences and achieve a consensus among the agents.