107 lines
3.5 KiB
Python
107 lines
3.5 KiB
Python
"""Task schema: goal, constraints, priority, state with validation."""
|
|
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field, field_validator, model_validator
|
|
|
|
from fusionagi._time import utc_now
|
|
|
|
|
|
class TaskState(str, Enum):
|
|
"""Lifecycle state of a task."""
|
|
|
|
PENDING = "pending"
|
|
ACTIVE = "active"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
class TaskPriority(str, Enum):
|
|
"""Task priority level."""
|
|
|
|
LOW = "low"
|
|
NORMAL = "normal"
|
|
HIGH = "high"
|
|
CRITICAL = "critical"
|
|
|
|
|
|
# Valid state transitions for validation
|
|
VALID_TASK_TRANSITIONS: dict[TaskState, set[TaskState]] = {
|
|
TaskState.PENDING: {TaskState.ACTIVE, TaskState.CANCELLED},
|
|
TaskState.ACTIVE: {TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED},
|
|
TaskState.COMPLETED: set(), # Terminal
|
|
TaskState.FAILED: {TaskState.PENDING, TaskState.CANCELLED}, # Allow retry
|
|
TaskState.CANCELLED: set(), # Terminal
|
|
}
|
|
|
|
|
|
class Task(BaseModel):
|
|
"""
|
|
Task representation for orchestration.
|
|
|
|
Includes validation for:
|
|
- Non-empty task_id and goal
|
|
- Timestamps for tracking
|
|
- State transition helpers
|
|
"""
|
|
|
|
task_id: str = Field(..., min_length=1, description="Unique task identifier")
|
|
goal: str = Field(..., min_length=1, description="High-level goal description")
|
|
constraints: list[str] = Field(default_factory=list, description="Constraints to respect")
|
|
priority: TaskPriority = Field(default=TaskPriority.NORMAL, description="Task priority")
|
|
state: TaskState = Field(default=TaskState.PENDING, description="Current task state")
|
|
metadata: dict[str, Any] = Field(default_factory=dict, description="Optional extra data")
|
|
created_at: datetime = Field(default_factory=utc_now, description="Creation timestamp")
|
|
updated_at: datetime | None = Field(default=None, description="Last update timestamp")
|
|
|
|
model_config = {"frozen": False}
|
|
|
|
@field_validator("task_id")
|
|
@classmethod
|
|
def validate_task_id(cls, v: str) -> str:
|
|
"""Validate task_id is not just whitespace."""
|
|
if not v.strip():
|
|
raise ValueError("task_id cannot be empty or whitespace")
|
|
return v
|
|
|
|
@field_validator("goal")
|
|
@classmethod
|
|
def validate_goal(cls, v: str) -> str:
|
|
"""Validate goal is not just whitespace."""
|
|
if not v.strip():
|
|
raise ValueError("goal cannot be empty or whitespace")
|
|
return v
|
|
|
|
def can_transition_to(self, new_state: TaskState) -> bool:
|
|
"""Check if transitioning to new_state is valid."""
|
|
if new_state == self.state:
|
|
return True
|
|
allowed = VALID_TASK_TRANSITIONS.get(self.state, set())
|
|
return new_state in allowed
|
|
|
|
def transition_to(self, new_state: TaskState) -> "Task":
|
|
"""
|
|
Create a new Task with the new state.
|
|
|
|
Raises:
|
|
ValueError: If the transition is not allowed.
|
|
"""
|
|
if not self.can_transition_to(new_state):
|
|
raise ValueError(
|
|
f"Invalid state transition: {self.state.value} -> {new_state.value}"
|
|
)
|
|
return self.model_copy(update={"state": new_state, "updated_at": utc_now()})
|
|
|
|
@property
|
|
def is_terminal(self) -> bool:
|
|
"""Check if task is in a terminal state."""
|
|
return self.state in (TaskState.COMPLETED, TaskState.CANCELLED)
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""Check if task is currently active."""
|
|
return self.state == TaskState.ACTIVE
|