Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

100 lines
3.2 KiB
Python

"""Telemetry tracer: per-head latency, costs, event bus subscription."""
from collections import deque
from dataclasses import dataclass, field
from typing import Any
import time
from fusionagi._logger import logger
_tracer: "TelemetryTracer | None" = None
@dataclass
class TraceEntry:
"""Single trace entry."""
event_type: str
task_id: str | None
head_id: str | None
latency_ms: float | None
payload: dict[str, Any]
timestamp: float = field(default_factory=time.monotonic)
class TelemetryTracer:
"""In-memory ring buffer for traces; subscribes to event bus."""
def __init__(self, max_entries: int = 10000) -> None:
self._entries: deque[TraceEntry] = deque(maxlen=max_entries)
self._subscription: Any = None
self._starts: dict[str, float] = {}
def subscribe(self, event_bus: Any) -> None:
"""Subscribe to event bus for message_received, dvadasa_complete."""
def on_message(_event_type: str, payload: dict[str, Any]) -> None:
task_id = payload.get("task_id", "")
recipient = payload.get("recipient", "")
self._starts[f"{task_id}:{recipient}"] = time.monotonic()
def on_dvadasa(_event_type: str, payload: dict[str, Any]) -> None:
task_id = payload.get("task_id", "")
head_count = payload.get("head_count", 0)
self._entries.append(
TraceEntry(
event_type="dvadasa_complete",
task_id=task_id,
head_id=None,
latency_ms=None,
payload={"head_count": head_count},
)
)
try:
event_bus.subscribe("message_received", on_message)
event_bus.subscribe("dvadasa_complete", on_dvadasa)
except Exception as e:
logger.warning("Telemetry subscribe failed", extra={"error": str(e)})
def record_head_output(self, task_id: str, head_id: str, start: float | None = None) -> None:
"""Record head completion with optional latency."""
key = f"{task_id}:{head_id}"
end = time.monotonic()
latency_ms = (end - self._starts.pop(key, end)) * 1000 if start is None else (end - start) * 1000
self._entries.append(
TraceEntry(
event_type="head_output",
task_id=task_id,
head_id=head_id,
latency_ms=latency_ms,
payload={},
)
)
def get_traces(self, task_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]:
"""Return traces, optionally filtered by task_id."""
out = []
for e in reversed(self._entries):
if task_id and e.task_id != task_id:
continue
out.append({
"event_type": e.event_type,
"task_id": e.task_id,
"head_id": e.head_id,
"latency_ms": e.latency_ms,
"payload": e.payload,
})
if len(out) >= limit:
break
return out
def get_tracer() -> TelemetryTracer | None:
return _tracer
def set_tracer(t: TelemetryTracer | None) -> None:
global _tracer
_tracer = t