Files
FusionAGI/fusionagi/reasoning/recomposition.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

50 lines
1.6 KiB
Python

"""Dynamic recomposition: build higher-order insights with traceability."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from fusionagi.schemas.atomic import AtomicSemanticUnit
from fusionagi.reasoning.tot import ThoughtNode
@dataclass
class RecomposedResponse:
"""Recomposed response with traceability to atomic units."""
summary: str = ""
key_claims: list[str] = field(default_factory=list)
unit_refs: list[str] = field(default_factory=list)
confidence: float = 0.0
metadata: dict[str, Any] = field(default_factory=dict)
def recompose(
thought_nodes: list[ThoughtNode],
atomic_units: list[AtomicSemanticUnit],
) -> RecomposedResponse:
"""Build higher-order insights from selected thought nodes."""
unit_refs: set[str] = set()
key_claims: list[str] = []
summaries: list[str] = []
for node in thought_nodes:
if node.thought:
summaries.append(node.thought[:200])
key_claims.append(node.thought[:150])
for uid in node.unit_refs:
unit_refs.add(uid)
summary = " ".join(summaries[:3]) if summaries else "No insights."
if len(summaries) > 3:
summary += " [truncated]"
avg_score = sum(n.score for n in thought_nodes) / len(thought_nodes) if thought_nodes else 0.0
return RecomposedResponse(
summary=summary,
key_claims=key_claims[:10],
unit_refs=list(unit_refs),
confidence=min(1.0, avg_score),
metadata={"node_count": len(thought_nodes), "unit_count": len(unit_refs)},
)