Files
FusionAGI/fusionagi/governance/rate_limiter.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

39 lines
1.4 KiB
Python

"""Rate limiting: per agent or per tool; reject or queue if exceeded.
Optional; not wired to Executor or Orchestrator by default. Wire by calling
allow(key) before tool invocation or message routing and checking the result.
"""
import time
from collections import defaultdict
from fusionagi._logger import logger
class RateLimiter:
"""Simple in-memory rate limiter: max N calls per window_seconds per key."""
def __init__(self, max_calls: int = 60, window_seconds: float = 60.0) -> None:
self._max_calls = max_calls
self._window = window_seconds
self._calls: dict[str, list[float]] = defaultdict(list)
def allow(self, key: str) -> tuple[bool, str]:
"""Record a call for key; return (True, "") or (False, reason)."""
now = time.monotonic()
cutoff = now - self._window
self._calls[key] = [t for t in self._calls[key] if t > cutoff]
if len(self._calls[key]) >= self._max_calls:
reason = f"Rate limit exceeded for {key}"
logger.info("Rate limiter rejected", extra={"key": key, "reason": reason})
return False, reason
self._calls[key].append(now)
return True, ""
def reset(self, key: str | None = None) -> None:
"""Reset counts for key or all."""
if key is None:
self._calls.clear()
else:
self._calls.pop(key, None)