"""In-process pub/sub event bus for task lifecycle and agent messages.""" from collections import defaultdict, deque from typing import Any, Callable from fusionagi._logger import logger from fusionagi._time import utc_now_iso # Type for event handlers: (event_type, payload) -> None EventHandler = Callable[[str, dict[str, Any]], None] class EventBus: """Simple in-process event bus: event type -> list of handlers; optional event history.""" def __init__(self, history_size: int = 0) -> None: """ Initialize event bus. Args: history_size: If > 0, keep the last N events for get_recent_events(). """ self._handlers: dict[str, list[EventHandler]] = defaultdict(list) self._history_size = max(0, history_size) self._history: deque[dict[str, Any]] = deque(maxlen=self._history_size) if self._history_size else deque() def subscribe(self, event_type: str, handler: EventHandler) -> None: """Register a handler for an event type.""" self._handlers[event_type].append(handler) def unsubscribe(self, event_type: str, handler: EventHandler) -> None: """Remove a handler for an event type.""" if event_type in self._handlers: try: self._handlers[event_type].remove(handler) except ValueError: pass def publish(self, event_type: str, payload: dict[str, Any] | None = None) -> None: """Publish an event; all registered handlers are invoked.""" payload = payload or {} if self._history_size > 0: self._history.append({ "event_type": event_type, "payload": dict(payload), "timestamp": utc_now_iso(), }) task_id = payload.get("task_id", "") logger.debug( "Event published", extra={"event_type": event_type, "task_id": task_id}, ) for h in self._handlers[event_type][:]: try: h(event_type, payload) except Exception: # Log and continue so one handler failure doesn't block others logger.exception( "Event handler failed", extra={"event_type": event_type}, ) def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]: """Return the most recent events (oldest first in slice). Only available if history_size > 0.""" if self._history_size == 0: return [] events = list(self._history) return events[-limit:] if limit else events def clear(self, event_type: str | None = None) -> None: """Clear handlers for one event type or all; clear history when clearing all.""" if event_type is None: self._handlers.clear() if self._history: self._history.clear() elif event_type in self._handlers: del self._handlers[event_type]