The AsyncBehaviour class
Note
For clarity, the snippets of code presented here are a simplified version of the actual implementation. We refer the reader to the Open Autonomy API for the complete details.
The AsyncBehaviour
class, introduced in the valory/abstract_round_abci
Skill, is a mixin class that allows the AEA developer to use
asynchronous programming patterns in a Behaviour
implementation. Since it is usual that many of the tasks to be carried by the state behaviours are long-running, this is the base class from which FSM Behaviours will be typically derived from.
The Need for Asynchronous Behaviours
The main motivation behind the AsyncBehaviour
utility class
is that in idiomatic AEA behaviour development, the act
method
cannot contain blocking code or long-running tasks, as otherwise
the AEA main thread that executes all the behaviours would get stuck.
For example, in order to interact with an external component through
a request-response pattern, e.g., sending a request to an HTTP server and waiting for its response, or
request the Decision Maker to sign a transaction. The usual approach in this case is to:
- Send the message from the
act()
method and update the state into "waiting for the response" (e.g., by updating an attribute in the shared state or in the behaviour instance, or by using the state pattern), such that the next call toact()
can be intercepted and controlled by the developer. - Receive the response in a concrete
Handler
object that is registered to process messages of the response protocol. - Handle the response in the handler's
handle()
method according to the skill business logic and the current state behaviour, and notify the behaviour about the change of state (e.g. by updating an attribute in the shared state such that the nextact
call can read it and take a different execution path).
For large and complex skills, this development approach is quite error-prone and expensive in terms of maintainability, as the business logic does not reside in a single skill component (i.e., in a behaviour class), but also in several other skill components (i.e., in the handler classes, one for each interaction protocol required by the behaviour).
Asynchronous Programming to the Rescue
A well-known programming technique that turned out very useful in the web development community is asynchronous programming.
Informally, a programming language that supports asynchronous programming allows running blocking operations asynchronously: the operation is not run in the same thread where the call happened, but it is delegated to another executor, e.g., another thread/process, allowing the caller function execution being "suspended" until the operation has completed. Once the blocking operation has completed, the execution of the function can process the result and continue as usual. This lets the main thread to perform other tasks while the function is waiting for the result of the operation.
If the reader is not familiar with asynchronous programming concepts, we suggest reading the following resources:
- MDN Web Docs: Asynchronous JavaScript
- MDN Web Docs: Glossary: asynchronous
asyncio
standard Python library documentation
How AsyncBehaviour
works
The behaviour execution model of the AEA framework is the following.
At the AEA startup, the framework registers a periodic task, one for each
Behaviour
object b
, that executes the b.act
method. Such periodic
execution for behaviour b
is scheduled in the main thread loop,
with a tick interval b.tick_interval
and starting time b.start_at
.
As mentioned above, the code in act()
should not be blocking,
as otherwise it would block the main thread execution, and
therefore it would prevent the execution of the other behaviours' act()
and the processing of incoming messages.
class SimpleBehaviour(Behaviour, ABC):
"""This class implements a simple behaviour."""
def act(self) -> None:
"""Do the action."""
# (...)
The AsyncBehaviour
utility class allows to wrap the execution
of the act()
method allowing its execution to be "suspended"
and resumed upon the happening of certain events
(e.g. the receiving of a message, a sleep timeout etc.).
Currently, the crux of the implementation is the
Python built-in generator machinery:
- from the developer perspective, the execution can be suspended by using
yield
oryield from
expressions. This will return a generator object to the framework, which can opportunely be stored in an object attribute; - from the framework perspective, the execution can be resumed by "sending" a
value to the generator object, using the
send()
method of the generator object. The value can beNone
, or a message sent by another skill component.
class AsyncBehaviour(ABC):
@abstractmethod
def async_act(self) -> Generator:
"""Do the act, supporting asynchronous execution."""
@abstractmethod
def async_act_wrapper(self) -> Generator:
"""Do the act, supporting asynchronous execution."""
# (...)
The abstract methods the developer should implement are called
async_act_wrapper
and async_act
.
The sequence diagram below gives the idea of what happens when the
act()
method of an AsyncBehaviour
is called:
self.gen.send(None) note over AsyncBehaviour: state RUNNING else state == RUNNING note over AsyncBehaviour: self.gen.send(None) else StopIteration note over AsyncBehaviour: state READY end AsyncBehaviour->>Main loop: (return) end
In words, the first time the act()
method is called:
- first, it creates the generator object by calling the used-defined
async_act_wrapper()
; - it triggers the first execution by sending the
None
value; - it returns the execution control at the first
yield
statement; - sets the state to
RUNNING
; - returns to the caller in the main loop.
Any subsequent calls to the act()
method are redirected to the generator
whose execution was triggered by the first call, which invokes async_act()
.
A simple example
Consider a (one-shot) behaviour whose logic is to print a sequence of messages separated by a sleep interval:
class PrintBehaviour(OneShotBehaviour, AsyncBehaviour):
def async_act_wrapper(self):
yield from self.async_act()
def async_act(self):
print("First message")
yield from self.sleep(1.0)
print("Second message")
yield from self.sleep(1.0)
print("Third message")
Without AsyncBehaviour
, one should take care of:
- remembering the "state" of the behaviour (i.e. what is the last message printed)
- handling the sleep interval by hand
This is a naive implementation
import datetime
from aea.skills.behaviours import SimpleBehaviour
class PrintBehaviour(SimpleBehaviour):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.state = 0
# remember time of last printed message
self.last_time = None
# timedelta of 0 days, 1 second
self.timedelta = datetime.timedelta(0, 1)
def act(self):
now = datetime.datetime.now()
if self.state == 0:
print("First message")
self.state += 1
self.last_time = now
return
if self.state == 1 and now > (self.last_time + self.timedelta):
print("Second message")
self.state += 1
self.last_time = now
return
if self.state == 2 and now > (self.last_time + self.timedelta):
print("Third message")
self.state += 1
self.last_time = now
return
# do nothing
Blocking requests
As explained above, one of the common tasks for a behaviour is
to interact with other services and/or agents via message-based
communication. In this section, we focus on a sequence of
request-response interactions through agent interaction protocols.
We consider the fetchai/generic_buyer
skill as an example (link to code).
The idiomatic approach
The idiomatic approach, implemented in the skill fetchai/generic_buyer
,
is outlined in the sequence diagram below. The suffix B
is a shorthand
for Behaviour
, and H
is a shorthand for Handler
.
The participants SearchB
, SearchH
, TransactionB
, SigningH
, LedgerH
, FipaH
and DecisionMaker
are internal components of the buyer AEA,
whereas ADS
, Seller
, and Ledger
are external actors.
Follows the breakdown of each message exchange:
- The buyer starts by searching for seller of the desired data,
and the search behaviour (
SearchB
) sends a search request to an agent discovery service (ADS); - The search result of the ADS gets routed to the search handler (
SearchH
), which selects one of the sellers, and sends a "call for proposal" (CFP) message to him. The CFP is the first message of a FIPA protocol interaction. See the AEA documentation on the AEA FIPA-like protocol. - The seller replies with a "FIPA proposal" to the buyer. Such message
is handled by the
FipaH
handler; - Once the negotiation has completed (only the
FipaH
is involved in the negotiation), theFipaH
handler sends the payment transaction to theTransactionB
behaviour such that it can be processed; - The
TransactionB
, which was periodically listening for new transaction to process, reads the new transaction and sends a signing requests to the DecisionMaker. Note that a skill component does not have access to the crypto identity of an AEA, and it has to rely on the DecisionMaker for certain operations, such as the signing of transactions. - The
DecisionMaker
sends the response to the dedicated handler, theSigningH
. TheSigningH
submit the transactions to theLedger
(through theledger_api
connection); - The
Ledger
's response (the transaction hash) is handled by theLedgerH
handler, which in turn sends the transaction hash to theSeller
- The
Seller
, once the transaction has been validated, will send the bought data to the buyer with an FIPA "inform" message, which is handled by theFipaH
handler.
The business logic is spread across different skill components, behaviours and handlers, due to the "forced callback" mechanism that forces the developer to handle the message of an interaction protocol in the handler registered for that protocol.
Using the AsyncBehaviour
The above example can be reimplemented in an AsyncBehaviour
in
the following way (Python-pseudocode):
class GenericBuyerBehaviour(OneShotBehaviour, AsyncBehaviour):
def async_act_wrapper(self):
yield from self.async_act()
def async_act(self):
search_request = build_search_request(...)
# send search request to the ADS
# and (asynchronously) wait for the response
response = yield from send(search_request)
agents = response.result
# pick the first agent in the result list
seller = agents[0]
# send CFP to the seller
# and (asynchronously) wait for the response
cfp = build_cfp(...)
response = yield from send(cfp, to=seller)
# here there should be the buyer strategy
# for the negotiation with the seller...
# (...)
# in case both parties accept the negotiation outcome:
tx = build_tx(...)
# send transaction to the decision maker
# and (asynchronously) wait for the response
signed_tx = yield from send(tx)
# send transaction to the distributed ledger
# and (asynchronously) wait for the response
tx_hash = yield from send(signed_tx)
# send transaction hash to the seller
send(tx_hash, to=seller)
# wait until the seller sends the data
inform_message = yield from self.wait_for_message()
print(inform_message.data)
# done!
As you can see, the core business logic of the buyer resides in the async_act
method. Many details of the implementation are omitted, like
the utility functions like build_*
and send
,
but they are conceptually similar to what is done in the handlers of the
fetchai/generic_buyer
skill.
The wait_for_message
method, uses the send(...)
method to wait for the
response, allowing it to wait for specific events triggered
by other components. In this case, each of the handlers will
dispatch the response to the requester component, whose request
is identified by the (dialogue) identifier
of the interaction.
However, note that the handler code in this case is skill-independent,
which means that they do not contribute to the business logic.