Files
FusionAGI/fusionagi/core/goal_manager.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

83 lines
3.4 KiB
Python

"""Goal manager: objectives, priorities, constraints, time/compute budget for AGI."""
from typing import Any
from fusionagi.schemas.goal import Goal, GoalBudget, GoalStatus
from fusionagi._logger import logger
class GoalManager:
"""
Manages goals with budgets. Tracks time/compute and can signal
when a goal is over budget (abort or degrade).
"""
def __init__(self) -> None:
self._goals: dict[str, Goal] = {}
self._budget_used: dict[str, dict[str, float]] = {} # goal_id -> {time_used, compute_used}
def add_goal(self, goal: Goal) -> None:
"""Register a goal."""
self._goals[goal.goal_id] = goal
self._budget_used[goal.goal_id] = {"time_used": 0.0, "compute_used": 0.0}
logger.info("Goal added", extra={"goal_id": goal.goal_id, "objective": goal.objective[:80]})
def get_goal(self, goal_id: str) -> Goal | None:
"""Return goal by id or None."""
return self._goals.get(goal_id)
def set_status(self, goal_id: str, status: GoalStatus) -> None:
"""Update goal status."""
g = self._goals.get(goal_id)
if g:
self._goals[goal_id] = g.model_copy(update={"status": status})
logger.debug("Goal status set", extra={"goal_id": goal_id, "status": status.value})
def record_time(self, goal_id: str, seconds: float) -> None:
"""Record elapsed time for a goal; check budget."""
if goal_id not in self._budget_used:
self._budget_used[goal_id] = {"time_used": 0.0, "compute_used": 0.0}
self._budget_used[goal_id]["time_used"] += seconds
self._check_budget(goal_id)
def record_compute(self, goal_id: str, units: float) -> None:
"""Record compute units for a goal; check budget."""
if goal_id not in self._budget_used:
self._budget_used[goal_id] = {"time_used": 0.0, "compute_used": 0.0}
self._budget_used[goal_id]["compute_used"] += units
self._check_budget(goal_id)
def _check_budget(self, goal_id: str) -> None:
"""If over budget, set goal to blocked/suspended and log."""
g = self._goals.get(goal_id)
if not g or not g.budget:
return
used = self._budget_used.get(goal_id, {})
over = False
if g.budget.time_seconds is not None and used.get("time_used", 0) >= g.budget.time_seconds:
over = True
if g.budget.compute_budget is not None and used.get("compute_used", 0) >= g.budget.compute_budget:
over = True
if over:
self.set_status(goal_id, GoalStatus.BLOCKED)
logger.warning("Goal over budget", extra={"goal_id": goal_id, "used": used})
def is_over_budget(self, goal_id: str) -> bool:
"""Return True if goal has exceeded its budget."""
g = self._goals.get(goal_id)
if not g or not g.budget:
return False
used = self._budget_used.get(goal_id, {})
if g.budget.time_seconds is not None and used.get("time_used", 0) >= g.budget.time_seconds:
return True
if g.budget.compute_budget is not None and used.get("compute_used", 0) >= g.budget.compute_budget:
return True
return False
def list_goals(self, status: GoalStatus | None = None) -> list[Goal]:
"""Return goals, optionally filtered by status."""
goals = list(self._goals.values())
if status is not None:
goals = [g for g in goals if g.status == status]
return goals