39 lines
1.4 KiB
Python
39 lines
1.4 KiB
Python
"""Rate limiting: per agent or per tool; reject or queue if exceeded.
|
|
|
|
Optional; not wired to Executor or Orchestrator by default. Wire by calling
|
|
allow(key) before tool invocation or message routing and checking the result.
|
|
"""
|
|
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class RateLimiter:
|
|
"""Simple in-memory rate limiter: max N calls per window_seconds per key."""
|
|
|
|
def __init__(self, max_calls: int = 60, window_seconds: float = 60.0) -> None:
|
|
self._max_calls = max_calls
|
|
self._window = window_seconds
|
|
self._calls: dict[str, list[float]] = defaultdict(list)
|
|
|
|
def allow(self, key: str) -> tuple[bool, str]:
|
|
"""Record a call for key; return (True, "") or (False, reason)."""
|
|
now = time.monotonic()
|
|
cutoff = now - self._window
|
|
self._calls[key] = [t for t in self._calls[key] if t > cutoff]
|
|
if len(self._calls[key]) >= self._max_calls:
|
|
reason = f"Rate limit exceeded for {key}"
|
|
logger.info("Rate limiter rejected", extra={"key": key, "reason": reason})
|
|
return False, reason
|
|
self._calls[key].append(now)
|
|
return True, ""
|
|
|
|
def reset(self, key: str | None = None) -> None:
|
|
"""Reset counts for key or all."""
|
|
if key is None:
|
|
self._calls.clear()
|
|
else:
|
|
self._calls.pop(key, None)
|