Files
FusionAGI/fusionagi/governance/audit_log.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

30 lines
1.3 KiB
Python

"""Structured audit log for AGI."""
from typing import Any
from fusionagi.schemas.audit import AuditEntry, AuditEventType
from fusionagi._logger import logger
import uuid
class AuditLog:
def __init__(self, max_entries=100000):
self._entries = []
self._max_entries = max_entries
self._by_task = {}
self._by_type = {}
def append(self, event_type, actor, action="", task_id=None, payload=None, outcome=""):
entry_id = str(uuid.uuid4())
entry = AuditEntry(entry_id=entry_id, event_type=event_type, actor=actor, task_id=task_id, action=action, payload=payload or {}, outcome=outcome)
if len(self._entries) >= self._max_entries:
self._entries.pop(0)
idx = len(self._entries)
self._entries.append(entry)
if entry.task_id:
self._by_task.setdefault(entry.task_id, []).append(idx)
self._by_type.setdefault(entry.event_type.value, []).append(idx)
return entry_id
def get_by_task(self, task_id, limit=100):
indices = self._by_task.get(task_id, [])[-limit:]
return [self._entries[i] for i in indices if i < len(self._entries)]
def get_by_type(self, event_type, limit=100):
indices = self._by_type.get(event_type.value, [])[-limit:]
return [self._entries[i] for i in indices if i < len(self._entries)]