Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

107 lines
3.5 KiB
Python

"""Task schema: goal, constraints, priority, state with validation."""
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, field_validator, model_validator
from fusionagi._time import utc_now
class TaskState(str, Enum):
"""Lifecycle state of a task."""
PENDING = "pending"
ACTIVE = "active"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskPriority(str, Enum):
"""Task priority level."""
LOW = "low"
NORMAL = "normal"
HIGH = "high"
CRITICAL = "critical"
# Valid state transitions for validation
VALID_TASK_TRANSITIONS: dict[TaskState, set[TaskState]] = {
TaskState.PENDING: {TaskState.ACTIVE, TaskState.CANCELLED},
TaskState.ACTIVE: {TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED},
TaskState.COMPLETED: set(), # Terminal
TaskState.FAILED: {TaskState.PENDING, TaskState.CANCELLED}, # Allow retry
TaskState.CANCELLED: set(), # Terminal
}
class Task(BaseModel):
"""
Task representation for orchestration.
Includes validation for:
- Non-empty task_id and goal
- Timestamps for tracking
- State transition helpers
"""
task_id: str = Field(..., min_length=1, description="Unique task identifier")
goal: str = Field(..., min_length=1, description="High-level goal description")
constraints: list[str] = Field(default_factory=list, description="Constraints to respect")
priority: TaskPriority = Field(default=TaskPriority.NORMAL, description="Task priority")
state: TaskState = Field(default=TaskState.PENDING, description="Current task state")
metadata: dict[str, Any] = Field(default_factory=dict, description="Optional extra data")
created_at: datetime = Field(default_factory=utc_now, description="Creation timestamp")
updated_at: datetime | None = Field(default=None, description="Last update timestamp")
model_config = {"frozen": False}
@field_validator("task_id")
@classmethod
def validate_task_id(cls, v: str) -> str:
"""Validate task_id is not just whitespace."""
if not v.strip():
raise ValueError("task_id cannot be empty or whitespace")
return v
@field_validator("goal")
@classmethod
def validate_goal(cls, v: str) -> str:
"""Validate goal is not just whitespace."""
if not v.strip():
raise ValueError("goal cannot be empty or whitespace")
return v
def can_transition_to(self, new_state: TaskState) -> bool:
"""Check if transitioning to new_state is valid."""
if new_state == self.state:
return True
allowed = VALID_TASK_TRANSITIONS.get(self.state, set())
return new_state in allowed
def transition_to(self, new_state: TaskState) -> "Task":
"""
Create a new Task with the new state.
Raises:
ValueError: If the transition is not allowed.
"""
if not self.can_transition_to(new_state):
raise ValueError(
f"Invalid state transition: {self.state.value} -> {new_state.value}"
)
return self.model_copy(update={"state": new_state, "updated_at": utc_now()})
@property
def is_terminal(self) -> bool:
"""Check if task is in a terminal state."""
return self.state in (TaskState.COMPLETED, TaskState.CANCELLED)
@property
def is_active(self) -> bool:
"""Check if task is currently active."""
return self.state == TaskState.ACTIVE