Files
FusionAGI/fusionagi/schemas/messages.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

95 lines
3.3 KiB
Python

"""Agent message schema: sender, recipient, intent, payload, confidence/uncertainty."""
from datetime import datetime
from typing import Any
from pydantic import BaseModel, Field, field_validator
from fusionagi._time import utc_now
class AgentMessage(BaseModel):
"""
Structured message between agents.
Includes validation for:
- Non-empty sender, recipient, and intent
- Confidence in valid [0, 1] range
"""
sender: str = Field(..., min_length=1, description="Agent id of sender")
recipient: str = Field(..., min_length=1, description="Agent id of recipient")
intent: str = Field(..., min_length=1, description="Message intent e.g. plan_ready, execute_step")
payload: dict[str, Any] = Field(default_factory=dict, description="Message payload")
confidence: float | None = Field(default=None, ge=0.0, le=1.0, description="Optional confidence [0,1]")
uncertainty: str | None = Field(default=None, description="Optional uncertainty note")
timestamp: datetime = Field(default_factory=utc_now, description="Message timestamp")
@field_validator("sender", "recipient", "intent")
@classmethod
def validate_non_whitespace(cls, v: str) -> str:
"""Validate string fields are not just whitespace."""
if not v.strip():
raise ValueError("Field cannot be empty or whitespace")
return v
@field_validator("confidence")
@classmethod
def validate_confidence(cls, v: float | None) -> float | None:
"""Validate confidence is in [0, 1] range."""
if v is not None and (v < 0.0 or v > 1.0):
raise ValueError("confidence must be between 0 and 1")
return v
class AgentMessageEnvelope(BaseModel):
"""
Top-level envelope for agent messages; can carry task context.
The envelope wraps a message and provides additional context:
- task_id: Associates the message with a specific task
- correlation_id: Enables request/response tracking
"""
message: AgentMessage = Field(..., description="The wrapped message")
task_id: str | None = Field(default=None, description="Associated task id if any")
correlation_id: str | None = Field(default=None, description="For request/response correlation")
@property
def sender(self) -> str:
"""Convenience accessor for message sender."""
return self.message.sender
@property
def recipient(self) -> str:
"""Convenience accessor for message recipient."""
return self.message.recipient
@property
def intent(self) -> str:
"""Convenience accessor for message intent."""
return self.message.intent
def create_response(
self,
intent: str,
payload: dict[str, Any] | None = None,
confidence: float | None = None,
) -> "AgentMessageEnvelope":
"""
Create a response envelope to this message.
Swaps sender/recipient and preserves task_id and correlation_id.
"""
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.message.recipient,
recipient=self.message.sender,
intent=intent,
payload=payload or {},
confidence=confidence,
),
task_id=self.task_id,
correlation_id=self.correlation_id,
)