Skip to content

autonomy.analyse.logs.collection

Log streams

LogCollection Objects

class LogCollection(ABC)

Collection of logs.

__init__

def __init__() -> None

Initialize object.

get_avilable_agents

@abstractmethod
def get_avilable_agents() -> List[str]

Returns a list of agent names.

create_agent_db

@abstractmethod
def create_agent_db(agent: str,
                    db: AgentLogsDB,
                    reset: bool = False) -> "LogCollection"

Create logs database.

get_next_log_block

@staticmethod
def get_next_log_block(
        fp: TextIO,
        prev_line: Optional[str]) -> Tuple[Optional[str], Optional[str]]

Get next log block.

parse

@classmethod
def parse(cls, file: Path) -> Generator[LogRow, None, None]

Parse logs and yield rows.

FromDirectory Objects

class FromDirectory(LogCollection)

Log stream from directory.

__init__

def __init__(directory: Path) -> None

Initialize object.

get_avilable_agents

def get_avilable_agents() -> List[str]

Returns a list of agent names.

create_agent_db

def create_agent_db(agent: str,
                    db: AgentLogsDB,
                    reset: bool = False) -> "FromDirectory"

Create logs table for agent.