The AsyncBehaviour class
For clarity, the snippets of code presented here are a simplified version of the actual implementation. We refer the reader to the Open Autonomy API for the complete details.
AsyncBehaviour class, introduced in the
Skill, is a mixin class that allows the AEA developer to use
asynchronous programming patterns in a
Behaviour implementation. Since it is usual that many of the tasks to be carried by the state behaviours are long-running, this is the base class from which FSM Behaviours will be typically derived from.
The Need for Asynchronous Behaviours
The main motivation behind the
AsyncBehaviour utility class
is that in idiomatic AEA behaviour development, the
cannot contain blocking code or long-running tasks, as otherwise
the AEA main thread that executes all the behaviours would get stuck.
For example, in order to interact with an external component through
a request-response pattern, e.g., sending a request to an HTTP server and waiting for its response, or
request the Decision Maker to sign a transaction. The usual approach in this case is to:
- Send the message from the
act()method and update the state into "waiting for the response" (e.g., by updating an attribute in the shared state or in the behaviour instance, or by using the state pattern), such that the next call to
act()can be intercepted and controlled by the developer.
- Receive the response in a concrete
Handlerobject that is registered to process messages of the response protocol.
- Handle the response in the handler's
handle()method according to the skill business logic and the current state behaviour, and notify the behaviour about the change of state (e.g. by updating an attribute in the shared state such that the next
actcall can read it and take a different execution path).
For large and complex skills, this development approach is quite error-prone and expensive in terms of maintainability, as the business logic does not reside in a single skill component (i.e., in a behaviour class), but also in several other skill components (i.e., in the handler classes, one for each interaction protocol required by the behaviour).
Asynchronous Programming to the Rescue
A well-known programming technique that turned out very useful in the web development community is asynchronous programming.
Informally, a programming language that supports asynchronous programming allows running blocking operations asynchronously: the operation is not run in the same thread where the call happened, but it is delegated to another executor, e.g., another thread/process, allowing the caller function execution being "suspended" until the operation has completed. Once the blocking operation has completed, the execution of the function can process the result and continue as usual. This lets the main thread to perform other tasks while the function is waiting for the result of the operation.
If the reader is not familiar with asynchronous programming concepts, we suggest reading the following resources:
- MDN Web Docs: Glossary: asynchronous
asynciostandard Python library documentation
The behaviour execution model of the AEA framework is the following.
At the AEA startup, the framework registers a periodic task, one for each
b, that executes the
b.act method. Such periodic
execution for behaviour
b is scheduled in the main thread loop,
with a tick interval
b.tick_interval and starting time
As mentioned above, the code in
act() should not be blocking,
as otherwise it would block the main thread execution, and
therefore it would prevent the execution of the other behaviours'
and the processing of incoming messages.
class SimpleBehaviour(Behaviour, ABC): """This class implements a simple behaviour.""" def act(self) -> None: """Do the action.""" # ...
AsyncBehaviour utility class allows to wrap the execution
act() method allowing its execution to be "suspended"
and resumed upon the happening of certain events
(e.g. the receiving of a message, a sleep timeout etc.).
Currently, the crux of the implementation is the
Python built-in generator machinery:
- from the developer perspective, the execution can be suspended by using
yield fromexpressions. This will return a generator object to the framework, which can opportunely be stored in an object attribute;
- from the framework perspective, the execution can be resumed by "sending" a
value to the generator object, using the
send()method of the generator object. The value can be
None, or a message sent by another skill component.
class AsyncBehaviour(ABC): @abstractmethod def async_act(self) -> Generator: """Do the act, supporting asynchronous execution.""" @abstractmethod def async_act_wrapper(self) -> Generator: """Do the act, supporting asynchronous execution.""" # ...
The abstract methods the developer should implement are called
The sequence diagram below gives the idea of what happens when the
act() method of an
AsyncBehaviour is called:
self.gen.send(None) note over AsyncBehaviour: state RUNNING else state == RUNNING note over AsyncBehaviour: self.gen.send(None) else StopIteration note over AsyncBehaviour: state READY end AsyncBehaviour->>Main loop: (return) end
In words, the first time the
act() method is called:
- first, it creates the generator object by calling the used-defined
- it triggers the first execution by sending the
- it returns the execution control at the first
- sets the state to
- returns to the caller in the main loop.
Any subsequent calls to the
act() method are redirected to the generator
whose execution was triggered by the first call, which invokes
A simple example
Consider a (one-shot) behaviour whose logic is to print a sequence of messages separated by a sleep interval:
class PrintBehaviour(OneShotBehaviour, AsyncBehaviour): def async_act_wrapper(self): yield from self.async_act() def async_act(self): print("First message") yield from self.sleep(1.0) print("Second message") yield from self.sleep(1.0) print("Third message")
AsyncBehaviour, one should take care of:
- remembering the "state" of the behaviour (i.e. what is the last message printed)
- handling the sleep interval by hand
This is a naive implementation
import datetime from aea.skills.behaviours import SimpleBehaviour class PrintBehaviour(SimpleBehaviour): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.state = 0 # remember time of last printed message self.last_time = None # timedelta of 0 days, 1 second self.timedelta = datetime.timedelta(0, 1) def act(self): now = datetime.datetime.now() if self.state == 0: print("First message") self.state += 1 self.last_time = now return if self.state == 1 and now > (self.last_time + self.timedelta): print("Second message") self.state += 1 self.last_time = now return if self.state == 2 and now > (self.last_time + self.timedelta): print("Third message") self.state += 1 self.last_time = now return # do nothing
As explained above, one of the common tasks for a behaviour is
to interact with other services and/or agents via message-based
communication. In this section, we focus on a sequence of
request-response interactions through agent interaction protocols.
We consider the
fetchai/generic_buyer skill as an example (link to code).
The idiomatic approach
The idiomatic approach, implemented in the skill
is outlined in the sequence diagram below. The suffix
B is a shorthand
H is a shorthand for
DecisionMaker are internal components of the buyer AEA,
Ledger are external actors.
Follows the breakdown of each message exchange:
- The buyer starts by searching for seller of the desired data,
and the search behaviour (
SearchB) sends a search request to an agent discovery service (ADS);
- The search result of the ADS gets routed to the search handler (
SearchH), which selects one of the sellers, and sends a "call for proposal" (CFP) message to him. The CFP is the first message of a FIPA protocol interaction. See the AEA documentation on the AEA FIPA-like protocol.
- The seller replies with a "FIPA proposal" to the buyer. Such message
is handled by the
- Once the negotiation has completed (only the
FipaHis involved in the negotiation), the
FipaHhandler sends the payment transaction to the
TransactionBbehaviour such that it can be processed;
TransactionB, which was periodically listening for new transaction to process, reads the new transaction and sends a signing requests to the DecisionMaker. Note that a skill component does not have access to the crypto identity of an AEA, and it has to rely on the DecisionMaker for certain operations, such as the signing of transactions.
DecisionMakersends the response to the dedicated handler, the
SigningHsubmit the transactions to the
Ledger's response (the transaction hash) is handled by the
LedgerHhandler, which in turn sends the transaction hash to the
Seller, once the transaction has been validated, will send the bought data to the buyer with an FIPA "inform" message, which is handled by the
The business logic is spread across different skill components, behaviours and handlers, due to the "forced callback" mechanism that forces the developer to handle the message of an interaction protocol in the handler registered for that protocol.
The above example can be reimplemented in an
the following way (Python-pseudocode):
class GenericBuyerBehaviour(OneShotBehaviour, AsyncBehaviour): def async_act_wrapper(self): yield from self.async_act() def async_act(self): search_request = build_search_request(...) # send search request to the ADS # and (asynchronously) wait for the response response = yield from send(search_request) agents = response.result # pick the first agent in the result list seller = agents # send CFP to the seller # and (asynchronously) wait for the response cfp = build_cfp(...) response = yield from send(cfp, to=seller) # here there should be the buyer strategy # for the negotiation with the seller... # ... # in case both parties accept the negotiation outcome: tx = build_tx(...) # send transaction to the decision maker # and (asynchronously) wait for the response signed_tx = yield from send(tx) # send transaction to the distributed ledger # and (asynchronously) wait for the response tx_hash = yield from send(signed_tx) # send transaction hash to the seller send(tx_hash, to=seller) # wait until the seller sends the data inform_message = yield from self.wait_for_message() print(inform_message.data) # done!
As you can see, the core business logic of the buyer resides in the
method. Many details of the implementation are omitted, like
the utility functions like
but they are conceptually similar to what is done in the handlers of the
wait_for_message method, uses the
send(...) method to wait for the
response, allowing it to wait for specific events triggered
by other components. In this case, each of the handlers will
dispatch the response to the requester component, whose request
is identified by the (dialogue) identifier
of the interaction.
However, note that the handler code in this case is skill-independent,
which means that they do not contribute to the business logic.