Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

320 lines
12 KiB
Python

"""
Native reasoning engine: symbolic, rule-based analysis independent of external LLMs.
Produces structured HeadOutput from prompt analysis using:
- Keyword and pattern extraction
- Head-specific domain logic
- Persona-driven synthesis
- No external API calls
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from fusionagi.config.head_personas import get_persona
from fusionagi.schemas.grounding import Citation
from fusionagi.schemas.head import HeadClaim, HeadId, HeadOutput, HeadRisk
@dataclass
class PromptAnalysis:
"""Structured analysis of a user prompt from native reasoning."""
intent: str = ""
entities: list[str] = field(default_factory=list)
constraints: list[str] = field(default_factory=list)
questions: list[str] = field(default_factory=list)
domain_signals: dict[str, float] = field(default_factory=dict)
keywords: set[str] = field(default_factory=set)
# Domain keywords per head: presence boosts relevance and shapes claims
HEAD_DOMAIN_KEYWORDS: dict[HeadId, set[str]] = {
HeadId.LOGIC: {"logic", "contradiction", "correct", "valid", "proof", "assumption", "therefore", "implies"},
HeadId.RESEARCH: {"source", "cite", "reference", "evidence", "study", "paper", "find", "search"},
HeadId.SYSTEMS: {"architecture", "scalability", "dependency", "system", "design", "component", "service"},
HeadId.STRATEGY: {"strategy", "roadmap", "priority", "tradeoff", "plan", "goal", "long-term"},
HeadId.PRODUCT: {"user", "ux", "product", "flow", "design", "experience", "interface"},
HeadId.SECURITY: {"security", "auth", "threat", "vulnerability", "secret", "encrypt", "attack"},
HeadId.SAFETY: {"safety", "harm", "policy", "ethical", "comply", "risk", "prevent"},
HeadId.RELIABILITY: {"reliability", "slo", "failover", "observability", "test", "load", "uptime"},
HeadId.COST: {"cost", "budget", "performance", "cache", "token", "efficient", "expensive"},
HeadId.DATA: {"data", "schema", "privacy", "retention", "memory", "storage", "database"},
HeadId.DEVEX: {"dev", "ci", "cd", "test", "tooling", "local", "developer", "workflow"},
}
def _extract_content(text: str) -> str:
"""Normalize and extract analyzable content from prompt."""
if not text:
return ""
# Collapse whitespace, strip
return " ".join(text.split()).strip()
def analyze_prompt(prompt: str) -> PromptAnalysis:
"""
Analyze prompt using pattern matching and keyword extraction.
No external APIs; pure symbolic reasoning.
"""
content = _extract_content(prompt).lower()
words = set(re.findall(r"\b[a-z0-9]{2,}\b", content))
analysis = PromptAnalysis(keywords=words)
# Intent: question vs statement vs request
if "?" in prompt:
analysis.intent = "question"
# Extract explicit questions
q_parts = re.split(r"\?+", prompt)
for part in q_parts[:-1]:
q = part.strip()
if len(q) > 10:
analysis.questions.append(q + "?")
elif any(w in content for w in ["how", "what", "why", "when", "where", "who"]):
analysis.intent = "question"
else:
analysis.intent = "statement" if len(prompt.split()) > 5 else "request"
# Entity-like phrases (title case or quoted)
entities = re.findall(r'"([^"]+)"', prompt)
entities += re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", prompt)
analysis.entities = list(dict.fromkeys(e for e in entities if len(e) > 2))[:10]
# Constraint signals
constraint_patterns = [
r"must\s+(\w[\w\s]+?)(?:\.|$)",
r"should\s+(\w[\w\s]+?)(?:\.|$)",
r"cannot\s+(\w[\w\s]+?)(?:\.|$)",
r"require[sd]?\s+(\w[\w\s]+?)(?:\.|$)",
r"constraint[s]?:\s*(\w[\w\s]+?)(?:\.|$)",
]
for pat in constraint_patterns:
for m in re.finditer(pat, prompt, re.I):
analysis.constraints.append(m.group(1).strip())
# Domain relevance per head
for hid, keywords in HEAD_DOMAIN_KEYWORDS.items():
if hid == HeadId.WITNESS:
continue
overlap = len(words & keywords) / max(len(keywords), 1)
analysis.domain_signals[hid.value] = min(1.0, overlap * 3)
return analysis
def _derive_claims_for_head(
head_id: HeadId,
analysis: PromptAnalysis,
prompt: str,
semantic_facts: list[dict[str, Any]] | None = None,
) -> list[HeadClaim]:
"""Derive atomic claims from analysis based on head domain."""
claims: list[HeadClaim] = []
persona = get_persona(head_id)
relevance = analysis.domain_signals.get(head_id.value, 0.3)
# Base claim from prompt summary
summary_claim = f"The prompt addresses: {analysis.intent}"
if analysis.entities:
summary_claim += f" involving {', '.join(analysis.entities[:3])}"
claims.append(
HeadClaim(
claim_text=summary_claim,
confidence=0.7 + relevance * 0.2,
evidence=[Citation(source_id="prompt_analysis", excerpt=prompt[:200], confidence=1.0)],
assumptions=[],
)
)
# Domain-specific claims
if head_id == HeadId.LOGIC:
claims.append(
HeadClaim(
claim_text="Logical consistency should be verified for any derived conclusions.",
confidence=0.8,
evidence=[],
assumptions=["Formal reasoning applies"],
)
)
elif head_id == HeadId.SECURITY:
if analysis.domain_signals.get(HeadId.SECURITY.value, 0) > 0.2:
claims.append(
HeadClaim(
claim_text="Security implications should be explicitly evaluated.",
confidence=0.85,
evidence=[],
assumptions=[],
)
)
elif head_id == HeadId.SAFETY:
claims.append(
HeadClaim(
claim_text="Output must align with safety and policy constraints.",
confidence=0.9,
evidence=[],
assumptions=[],
)
)
elif head_id == HeadId.STRATEGY and analysis.constraints:
claims.append(
HeadClaim(
claim_text=f"Constraints identified: {'; '.join(analysis.constraints[:2])}.",
confidence=0.75,
evidence=[],
assumptions=[],
)
)
# Memory-augmented: add claims from semantic facts when available
if semantic_facts:
for fact in semantic_facts[:3]:
stmt = fact.get("statement", "")
if stmt:
claims.append(
HeadClaim(
claim_text=stmt[:200],
confidence=0.6,
evidence=[
Citation(
source_id=fact.get("source", "semantic_memory"),
excerpt=stmt[:100],
confidence=0.8,
)
],
assumptions=["Stored fact; verify currency"],
)
)
return claims
def _derive_risks_for_head(head_id: HeadId, analysis: PromptAnalysis) -> list[HeadRisk]:
"""Identify risks based on head domain and analysis."""
risks: list[HeadRisk] = []
relevance = analysis.domain_signals.get(head_id.value, 0.3)
if relevance < 0.2:
risks.append(
HeadRisk(
description="Low domain relevance; analysis may be shallow for this head.",
severity="low",
)
)
if head_id == HeadId.SECURITY and relevance > 0.3:
risks.append(
HeadRisk(
description="Security-sensitive topic; require explicit threat assessment.",
severity="high",
)
)
if head_id == HeadId.SAFETY:
risks.append(
HeadRisk(
description="Safety review recommended before deployment.",
severity="medium",
)
)
return risks
def _synthesize_summary(head_id: HeadId, analysis: PromptAnalysis, claims: list[HeadClaim]) -> str:
"""Synthesize persona-appropriate summary from claims and analysis."""
persona = get_persona(head_id)
tone = persona.get("tone", "balanced")
expression = persona.get("expression", "neutral")
head_name = head_id.value.replace("_", " ").title()
parts: list[str] = []
if claims:
primary = claims[0].claim_text
parts.append(f"From {expression} perspective: {primary[:120]}{'...' if len(primary) > 120 else ''}.")
if analysis.questions:
parts.append(f"Addresses {len(analysis.questions)} explicit question(s).")
if analysis.constraints:
parts.append(f"Constraints noted: {len(analysis.constraints)}.")
if not parts:
parts.append(f"{head_name} head analysis: prompt analyzed with {tone} assessment.")
return " ".join(parts)
def produce_head_output(
head_id: HeadId,
prompt: str,
semantic_facts: list[dict[str, Any]] | None = None,
) -> HeadOutput:
"""
Produce structured HeadOutput using native reasoning only.
No external LLM calls. Uses symbolic analysis, domain logic, and persona-driven synthesis.
"""
if head_id == HeadId.WITNESS:
raise ValueError("Witness does not produce HeadOutput; use WitnessAgent")
analysis = analyze_prompt(prompt)
claims = _derive_claims_for_head(head_id, analysis, prompt, semantic_facts)
risks = _derive_risks_for_head(head_id, analysis)
summary = _synthesize_summary(head_id, analysis, claims)
# Recommended actions from analysis
actions: list[str] = []
if analysis.questions:
actions.append("Address each explicit question in the response.")
if analysis.constraints:
actions.append("Verify output satisfies stated constraints.")
if head_id in (HeadId.SECURITY, HeadId.SAFETY):
actions.append("Perform domain-specific review before finalizing.")
return HeadOutput(
head_id=head_id,
summary=summary,
claims=claims,
risks=risks,
questions=analysis.questions[:3] if analysis.questions else [],
recommended_actions=actions[:5] or ["Proceed with synthesis."],
tone_guidance=get_persona(head_id).get("tone", "balanced"),
)
def _domain_for_head(head_id: HeadId) -> str:
"""Map head to semantic memory domain."""
return head_id.value
class NativeReasoningProvider:
"""
Provider for native reasoning: produces HeadOutput without external APIs.
Optional memory integration: when semantic_memory is provided, retrieves
relevant facts to ground claims. When episodic_memory is provided, can
reference similar past outcomes (future: full retrieval).
"""
def __init__(
self,
semantic_memory: "SemanticMemory | None" = None,
episodic_memory: "EpisodicMemory | None" = None,
) -> None:
self._semantic = semantic_memory
self._episodic = episodic_memory
def produce_head_output(self, head_id: HeadId, prompt: str) -> HeadOutput:
"""Produce HeadOutput for the given head and prompt."""
return produce_head_output(
head_id,
prompt,
semantic_facts=self._get_relevant_facts(head_id) if self._semantic else None,
)
def _get_relevant_facts(self, head_id: HeadId, limit: int = 5) -> list[dict[str, Any]]:
"""Retrieve domain-relevant facts from semantic memory."""
if not self._semantic:
return []
domain = _domain_for_head(head_id)
return self._semantic.query(domain=domain, limit=limit)