95 lines
3.3 KiB
Python
95 lines
3.3 KiB
Python
"""Agent message schema: sender, recipient, intent, payload, confidence/uncertainty."""
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from fusionagi._time import utc_now
|
|
|
|
|
|
class AgentMessage(BaseModel):
|
|
"""
|
|
Structured message between agents.
|
|
|
|
Includes validation for:
|
|
- Non-empty sender, recipient, and intent
|
|
- Confidence in valid [0, 1] range
|
|
"""
|
|
|
|
sender: str = Field(..., min_length=1, description="Agent id of sender")
|
|
recipient: str = Field(..., min_length=1, description="Agent id of recipient")
|
|
intent: str = Field(..., min_length=1, description="Message intent e.g. plan_ready, execute_step")
|
|
payload: dict[str, Any] = Field(default_factory=dict, description="Message payload")
|
|
confidence: float | None = Field(default=None, ge=0.0, le=1.0, description="Optional confidence [0,1]")
|
|
uncertainty: str | None = Field(default=None, description="Optional uncertainty note")
|
|
timestamp: datetime = Field(default_factory=utc_now, description="Message timestamp")
|
|
|
|
@field_validator("sender", "recipient", "intent")
|
|
@classmethod
|
|
def validate_non_whitespace(cls, v: str) -> str:
|
|
"""Validate string fields are not just whitespace."""
|
|
if not v.strip():
|
|
raise ValueError("Field cannot be empty or whitespace")
|
|
return v
|
|
|
|
@field_validator("confidence")
|
|
@classmethod
|
|
def validate_confidence(cls, v: float | None) -> float | None:
|
|
"""Validate confidence is in [0, 1] range."""
|
|
if v is not None and (v < 0.0 or v > 1.0):
|
|
raise ValueError("confidence must be between 0 and 1")
|
|
return v
|
|
|
|
|
|
class AgentMessageEnvelope(BaseModel):
|
|
"""
|
|
Top-level envelope for agent messages; can carry task context.
|
|
|
|
The envelope wraps a message and provides additional context:
|
|
- task_id: Associates the message with a specific task
|
|
- correlation_id: Enables request/response tracking
|
|
"""
|
|
|
|
message: AgentMessage = Field(..., description="The wrapped message")
|
|
task_id: str | None = Field(default=None, description="Associated task id if any")
|
|
correlation_id: str | None = Field(default=None, description="For request/response correlation")
|
|
|
|
@property
|
|
def sender(self) -> str:
|
|
"""Convenience accessor for message sender."""
|
|
return self.message.sender
|
|
|
|
@property
|
|
def recipient(self) -> str:
|
|
"""Convenience accessor for message recipient."""
|
|
return self.message.recipient
|
|
|
|
@property
|
|
def intent(self) -> str:
|
|
"""Convenience accessor for message intent."""
|
|
return self.message.intent
|
|
|
|
def create_response(
|
|
self,
|
|
intent: str,
|
|
payload: dict[str, Any] | None = None,
|
|
confidence: float | None = None,
|
|
) -> "AgentMessageEnvelope":
|
|
"""
|
|
Create a response envelope to this message.
|
|
|
|
Swaps sender/recipient and preserves task_id and correlation_id.
|
|
"""
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.message.recipient,
|
|
recipient=self.message.sender,
|
|
intent=intent,
|
|
payload=payload or {},
|
|
confidence=confidence,
|
|
),
|
|
task_id=self.task_id,
|
|
correlation_id=self.correlation_id,
|
|
)
|