78 lines
2.9 KiB
Python
78 lines
2.9 KiB
Python
"""In-process pub/sub event bus for task lifecycle and agent messages."""
|
|
|
|
from collections import defaultdict, deque
|
|
from typing import Any, Callable
|
|
|
|
from fusionagi._logger import logger
|
|
from fusionagi._time import utc_now_iso
|
|
|
|
# Type for event handlers: (event_type, payload) -> None
|
|
EventHandler = Callable[[str, dict[str, Any]], None]
|
|
|
|
|
|
class EventBus:
|
|
"""Simple in-process event bus: event type -> list of handlers; optional event history."""
|
|
|
|
def __init__(self, history_size: int = 0) -> None:
|
|
"""
|
|
Initialize event bus.
|
|
|
|
Args:
|
|
history_size: If > 0, keep the last N events for get_recent_events().
|
|
"""
|
|
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
|
|
self._history_size = max(0, history_size)
|
|
self._history: deque[dict[str, Any]] = deque(maxlen=self._history_size) if self._history_size else deque()
|
|
|
|
def subscribe(self, event_type: str, handler: EventHandler) -> None:
|
|
"""Register a handler for an event type."""
|
|
self._handlers[event_type].append(handler)
|
|
|
|
def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
|
|
"""Remove a handler for an event type."""
|
|
if event_type in self._handlers:
|
|
try:
|
|
self._handlers[event_type].remove(handler)
|
|
except ValueError:
|
|
pass
|
|
|
|
def publish(self, event_type: str, payload: dict[str, Any] | None = None) -> None:
|
|
"""Publish an event; all registered handlers are invoked."""
|
|
payload = payload or {}
|
|
if self._history_size > 0:
|
|
self._history.append({
|
|
"event_type": event_type,
|
|
"payload": dict(payload),
|
|
"timestamp": utc_now_iso(),
|
|
})
|
|
task_id = payload.get("task_id", "")
|
|
logger.debug(
|
|
"Event published",
|
|
extra={"event_type": event_type, "task_id": task_id},
|
|
)
|
|
for h in self._handlers[event_type][:]:
|
|
try:
|
|
h(event_type, payload)
|
|
except Exception:
|
|
# Log and continue so one handler failure doesn't block others
|
|
logger.exception(
|
|
"Event handler failed",
|
|
extra={"event_type": event_type},
|
|
)
|
|
|
|
def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]:
|
|
"""Return the most recent events (oldest first in slice). Only available if history_size > 0."""
|
|
if self._history_size == 0:
|
|
return []
|
|
events = list(self._history)
|
|
return events[-limit:] if limit else events
|
|
|
|
def clear(self, event_type: str | None = None) -> None:
|
|
"""Clear handlers for one event type or all; clear history when clearing all."""
|
|
if event_type is None:
|
|
self._handlers.clear()
|
|
if self._history:
|
|
self._history.clear()
|
|
elif event_type in self._handlers:
|
|
del self._handlers[event_type]
|