100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
"""Telemetry tracer: per-head latency, costs, event bus subscription."""
|
|
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
import time
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
_tracer: "TelemetryTracer | None" = None
|
|
|
|
|
|
@dataclass
|
|
class TraceEntry:
|
|
"""Single trace entry."""
|
|
|
|
event_type: str
|
|
task_id: str | None
|
|
head_id: str | None
|
|
latency_ms: float | None
|
|
payload: dict[str, Any]
|
|
timestamp: float = field(default_factory=time.monotonic)
|
|
|
|
|
|
class TelemetryTracer:
|
|
"""In-memory ring buffer for traces; subscribes to event bus."""
|
|
|
|
def __init__(self, max_entries: int = 10000) -> None:
|
|
self._entries: deque[TraceEntry] = deque(maxlen=max_entries)
|
|
self._subscription: Any = None
|
|
self._starts: dict[str, float] = {}
|
|
|
|
def subscribe(self, event_bus: Any) -> None:
|
|
"""Subscribe to event bus for message_received, dvadasa_complete."""
|
|
|
|
def on_message(_event_type: str, payload: dict[str, Any]) -> None:
|
|
task_id = payload.get("task_id", "")
|
|
recipient = payload.get("recipient", "")
|
|
self._starts[f"{task_id}:{recipient}"] = time.monotonic()
|
|
|
|
def on_dvadasa(_event_type: str, payload: dict[str, Any]) -> None:
|
|
task_id = payload.get("task_id", "")
|
|
head_count = payload.get("head_count", 0)
|
|
self._entries.append(
|
|
TraceEntry(
|
|
event_type="dvadasa_complete",
|
|
task_id=task_id,
|
|
head_id=None,
|
|
latency_ms=None,
|
|
payload={"head_count": head_count},
|
|
)
|
|
)
|
|
|
|
try:
|
|
event_bus.subscribe("message_received", on_message)
|
|
event_bus.subscribe("dvadasa_complete", on_dvadasa)
|
|
except Exception as e:
|
|
logger.warning("Telemetry subscribe failed", extra={"error": str(e)})
|
|
|
|
def record_head_output(self, task_id: str, head_id: str, start: float | None = None) -> None:
|
|
"""Record head completion with optional latency."""
|
|
key = f"{task_id}:{head_id}"
|
|
end = time.monotonic()
|
|
latency_ms = (end - self._starts.pop(key, end)) * 1000 if start is None else (end - start) * 1000
|
|
self._entries.append(
|
|
TraceEntry(
|
|
event_type="head_output",
|
|
task_id=task_id,
|
|
head_id=head_id,
|
|
latency_ms=latency_ms,
|
|
payload={},
|
|
)
|
|
)
|
|
|
|
def get_traces(self, task_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]:
|
|
"""Return traces, optionally filtered by task_id."""
|
|
out = []
|
|
for e in reversed(self._entries):
|
|
if task_id and e.task_id != task_id:
|
|
continue
|
|
out.append({
|
|
"event_type": e.event_type,
|
|
"task_id": e.task_id,
|
|
"head_id": e.head_id,
|
|
"latency_ms": e.latency_ms,
|
|
"payload": e.payload,
|
|
})
|
|
if len(out) >= limit:
|
|
break
|
|
return out
|
|
|
|
|
|
def get_tracer() -> TelemetryTracer | None:
|
|
return _tracer
|
|
|
|
|
|
def set_tracer(t: TelemetryTracer | None) -> None:
|
|
global _tracer
|
|
_tracer = t
|