autonomy.analyse.logs.collection
Log streams
LogCollection Objects
class LogCollection(ABC)
Collection of logs.
__
init__
def __init__() -> None
Initialize object.
get_
avilable_
agents
@abstractmethod
def get_avilable_agents() -> List[str]
Returns a list of agent names.
create_
agent_
db
@abstractmethod
def create_agent_db(agent: str,
db: AgentLogsDB,
reset: bool = False) -> "LogCollection"
Create logs database.
get_
next_
log_
block
@staticmethod
def get_next_log_block(
fp: TextIO,
prev_line: Optional[str]) -> Tuple[Optional[str], Optional[str]]
Get next log block.
parse
@classmethod
def parse(cls, file: Path) -> Generator[LogRow, None, None]
Parse logs and yield rows.
FromDirectory Objects
class FromDirectory(LogCollection)
Log stream from directory.
__
init__
def __init__(directory: Path) -> None
Initialize object.
get_
avilable_
agents
def get_avilable_agents() -> List[str]
Returns a list of agent names.
create_
agent_
db
def create_agent_db(agent: str,
db: AgentLogsDB,
reset: bool = False) -> "FromDirectory"
Create logs table for agent.