Skip to content

packages.valory.connections.abci.tests.test_fuzz.mock_node.channels.tcp_channel

TcpChannel for MockNode

TcpChannel Objects

class TcpChannel(BaseChannel)

Implements BaseChannel to use TCP sockets

MAX_READ_IN_BYTES

Max we'll consume on a read stream

__init__

def __init__(**kwargs: Dict) -> None

Initializes a TcpChannel

:param: kwargs: - host: the host of the ABCI app (localhost by default) - port: the port of the ABCI app (26658 by default)

is_connected

@property
def is_connected() -> bool

Check whether the channel is connected.

connect

def connect() -> None

Set up the channel.

disconnect

def disconnect() -> None

Tear down the channel.

send_info

def send_info(request: abci_types.RequestInfo) -> abci_types.ResponseInfo

Sends an info request.

:param: request: RequestInfo pb object :return: ResponseInfo pb object

send_echo

def send_echo(request: abci_types.RequestEcho) -> abci_types.ResponseEcho

Sends an echo request.

:param: request: RequestEcho pb object :return: ResponseEcho pb object

send_flush

def send_flush(request: abci_types.RequestFlush) -> abci_types.ResponseFlush

Sends an flush request.

:param: request: RequestFlush pb object :return: ResponseFlush pb object

send_set_option

def send_set_option(
        request: abci_types.RequestSetOption) -> abci_types.ResponseSetOption

Sends an setOption request.

:param: request: RequestSetOption pb object :return: ResponseSetOption pb object

send_deliver_tx

def send_deliver_tx(
        request: abci_types.RequestDeliverTx) -> abci_types.ResponseDeliverTx

Sends an deliverTx request.

:param: request: RequestDeliverTx pb object :return: ResponseDeliverTx pb object

send_check_tx

def send_check_tx(
        request: abci_types.RequestCheckTx) -> abci_types.ResponseCheckTx

Sends an checkTx request.

:param: request: RequestCheckTx pb object :return: ResponseCheckTx pb object

send_query

def send_query(request: abci_types.RequestQuery) -> abci_types.ResponseQuery

Sends an query request.

:param: request: RequestQuery pb object :return: ResponseQuery pb object

send_commit

def send_commit(
        request: abci_types.RequestCommit) -> abci_types.ResponseCommit

Sends an commit request.

:param: request: RequestCommit pb object :return: ResponseCommit pb object

send_init_chain

def send_init_chain(
        request: abci_types.RequestInitChain) -> abci_types.ResponseInitChain

Sends an initChain request.

:param: request: RequestInitChain pb object :return: ResponseInitChain pb object

send_begin_block

def send_begin_block(
        request: abci_types.RequestBeginBlock
) -> abci_types.ResponseBeginBlock

Sends an beginBlock request.

:param: request: RequestBeginBlock pb object :return: ResponseBeginBlock pb object

send_end_block

def send_end_block(
        request: abci_types.RequestEndBlock) -> abci_types.ResponseEndBlock

Sends an endBlock request.

:param: request: RequestEndBlock pb object :return: ResponseEndBlock pb object

send_list_snapshots

def send_list_snapshots(
    request: abci_types.RequestListSnapshots
) -> abci_types.ResponseListSnapshots

Sends an listSnapshots request.

:param: request: RequestListSnapshots pb object :return: ResponseListSnapshots pb object

send_offer_snapshot

def send_offer_snapshot(
    request: abci_types.RequestOfferSnapshot
) -> abci_types.ResponseOfferSnapshot

Sends an offerSnapshot request.

:param: request: RequestOfferSnapshot pb object :return: ResponseOfferSnapshot pb object

send_load_snapshot_chunk

def send_load_snapshot_chunk(
    request: abci_types.RequestLoadSnapshotChunk
) -> abci_types.ResponseLoadSnapshotChunk

Sends an loadSnapshotChunk request.

:param: request: RequestLoadSnapshotChunk pb object :return: ResponseLoadSnapshotChunk pb object

send_apply_snapshot_chunk

def send_apply_snapshot_chunk(
    request: abci_types.RequestApplySnapshotChunk
) -> abci_types.ResponseApplySnapshotChunk

Sends an applySnapshotChunk request.

:param: request: RequestApplySnapshotChunk pb object :return: ResponseApplySnapshotChunk pb object