83 lines
3.4 KiB
Python
83 lines
3.4 KiB
Python
"""Goal manager: objectives, priorities, constraints, time/compute budget for AGI."""
|
|
|
|
from typing import Any
|
|
|
|
from fusionagi.schemas.goal import Goal, GoalBudget, GoalStatus
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class GoalManager:
|
|
"""
|
|
Manages goals with budgets. Tracks time/compute and can signal
|
|
when a goal is over budget (abort or degrade).
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._goals: dict[str, Goal] = {}
|
|
self._budget_used: dict[str, dict[str, float]] = {} # goal_id -> {time_used, compute_used}
|
|
|
|
def add_goal(self, goal: Goal) -> None:
|
|
"""Register a goal."""
|
|
self._goals[goal.goal_id] = goal
|
|
self._budget_used[goal.goal_id] = {"time_used": 0.0, "compute_used": 0.0}
|
|
logger.info("Goal added", extra={"goal_id": goal.goal_id, "objective": goal.objective[:80]})
|
|
|
|
def get_goal(self, goal_id: str) -> Goal | None:
|
|
"""Return goal by id or None."""
|
|
return self._goals.get(goal_id)
|
|
|
|
def set_status(self, goal_id: str, status: GoalStatus) -> None:
|
|
"""Update goal status."""
|
|
g = self._goals.get(goal_id)
|
|
if g:
|
|
self._goals[goal_id] = g.model_copy(update={"status": status})
|
|
logger.debug("Goal status set", extra={"goal_id": goal_id, "status": status.value})
|
|
|
|
def record_time(self, goal_id: str, seconds: float) -> None:
|
|
"""Record elapsed time for a goal; check budget."""
|
|
if goal_id not in self._budget_used:
|
|
self._budget_used[goal_id] = {"time_used": 0.0, "compute_used": 0.0}
|
|
self._budget_used[goal_id]["time_used"] += seconds
|
|
self._check_budget(goal_id)
|
|
|
|
def record_compute(self, goal_id: str, units: float) -> None:
|
|
"""Record compute units for a goal; check budget."""
|
|
if goal_id not in self._budget_used:
|
|
self._budget_used[goal_id] = {"time_used": 0.0, "compute_used": 0.0}
|
|
self._budget_used[goal_id]["compute_used"] += units
|
|
self._check_budget(goal_id)
|
|
|
|
def _check_budget(self, goal_id: str) -> None:
|
|
"""If over budget, set goal to blocked/suspended and log."""
|
|
g = self._goals.get(goal_id)
|
|
if not g or not g.budget:
|
|
return
|
|
used = self._budget_used.get(goal_id, {})
|
|
over = False
|
|
if g.budget.time_seconds is not None and used.get("time_used", 0) >= g.budget.time_seconds:
|
|
over = True
|
|
if g.budget.compute_budget is not None and used.get("compute_used", 0) >= g.budget.compute_budget:
|
|
over = True
|
|
if over:
|
|
self.set_status(goal_id, GoalStatus.BLOCKED)
|
|
logger.warning("Goal over budget", extra={"goal_id": goal_id, "used": used})
|
|
|
|
def is_over_budget(self, goal_id: str) -> bool:
|
|
"""Return True if goal has exceeded its budget."""
|
|
g = self._goals.get(goal_id)
|
|
if not g or not g.budget:
|
|
return False
|
|
used = self._budget_used.get(goal_id, {})
|
|
if g.budget.time_seconds is not None and used.get("time_used", 0) >= g.budget.time_seconds:
|
|
return True
|
|
if g.budget.compute_budget is not None and used.get("compute_used", 0) >= g.budget.compute_budget:
|
|
return True
|
|
return False
|
|
|
|
def list_goals(self, status: GoalStatus | None = None) -> list[Goal]:
|
|
"""Return goals, optionally filtered by status."""
|
|
goals = list(self._goals.values())
|
|
if status is not None:
|
|
goals = [g for g in goals if g.status == status]
|
|
return goals
|