Files
FusionAGI/fusionagi/core/event_bus.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

78 lines
2.9 KiB
Python

"""In-process pub/sub event bus for task lifecycle and agent messages."""
from collections import defaultdict, deque
from typing import Any, Callable
from fusionagi._logger import logger
from fusionagi._time import utc_now_iso
# Type for event handlers: (event_type, payload) -> None
EventHandler = Callable[[str, dict[str, Any]], None]
class EventBus:
"""Simple in-process event bus: event type -> list of handlers; optional event history."""
def __init__(self, history_size: int = 0) -> None:
"""
Initialize event bus.
Args:
history_size: If > 0, keep the last N events for get_recent_events().
"""
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
self._history_size = max(0, history_size)
self._history: deque[dict[str, Any]] = deque(maxlen=self._history_size) if self._history_size else deque()
def subscribe(self, event_type: str, handler: EventHandler) -> None:
"""Register a handler for an event type."""
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
"""Remove a handler for an event type."""
if event_type in self._handlers:
try:
self._handlers[event_type].remove(handler)
except ValueError:
pass
def publish(self, event_type: str, payload: dict[str, Any] | None = None) -> None:
"""Publish an event; all registered handlers are invoked."""
payload = payload or {}
if self._history_size > 0:
self._history.append({
"event_type": event_type,
"payload": dict(payload),
"timestamp": utc_now_iso(),
})
task_id = payload.get("task_id", "")
logger.debug(
"Event published",
extra={"event_type": event_type, "task_id": task_id},
)
for h in self._handlers[event_type][:]:
try:
h(event_type, payload)
except Exception:
# Log and continue so one handler failure doesn't block others
logger.exception(
"Event handler failed",
extra={"event_type": event_type},
)
def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]:
"""Return the most recent events (oldest first in slice). Only available if history_size > 0."""
if self._history_size == 0:
return []
events = list(self._history)
return events[-limit:] if limit else events
def clear(self, event_type: str | None = None) -> None:
"""Clear handlers for one event type or all; clear history when clearing all."""
if event_type is None:
self._handlers.clear()
if self._history:
self._history.clear()
elif event_type in self._handlers:
del self._handlers[event_type]