""" Native reasoning engine: symbolic, rule-based analysis independent of external LLMs. Produces structured HeadOutput from prompt analysis using: - Keyword and pattern extraction - Head-specific domain logic - Persona-driven synthesis - No external API calls """ from __future__ import annotations import re from dataclasses import dataclass, field from typing import Any from fusionagi.config.head_personas import get_persona from fusionagi.schemas.grounding import Citation from fusionagi.schemas.head import HeadClaim, HeadId, HeadOutput, HeadRisk @dataclass class PromptAnalysis: """Structured analysis of a user prompt from native reasoning.""" intent: str = "" entities: list[str] = field(default_factory=list) constraints: list[str] = field(default_factory=list) questions: list[str] = field(default_factory=list) domain_signals: dict[str, float] = field(default_factory=dict) keywords: set[str] = field(default_factory=set) # Domain keywords per head: presence boosts relevance and shapes claims HEAD_DOMAIN_KEYWORDS: dict[HeadId, set[str]] = { HeadId.LOGIC: {"logic", "contradiction", "correct", "valid", "proof", "assumption", "therefore", "implies"}, HeadId.RESEARCH: {"source", "cite", "reference", "evidence", "study", "paper", "find", "search"}, HeadId.SYSTEMS: {"architecture", "scalability", "dependency", "system", "design", "component", "service"}, HeadId.STRATEGY: {"strategy", "roadmap", "priority", "tradeoff", "plan", "goal", "long-term"}, HeadId.PRODUCT: {"user", "ux", "product", "flow", "design", "experience", "interface"}, HeadId.SECURITY: {"security", "auth", "threat", "vulnerability", "secret", "encrypt", "attack"}, HeadId.SAFETY: {"safety", "harm", "policy", "ethical", "comply", "risk", "prevent"}, HeadId.RELIABILITY: {"reliability", "slo", "failover", "observability", "test", "load", "uptime"}, HeadId.COST: {"cost", "budget", "performance", "cache", "token", "efficient", "expensive"}, HeadId.DATA: {"data", "schema", "privacy", "retention", "memory", "storage", "database"}, HeadId.DEVEX: {"dev", "ci", "cd", "test", "tooling", "local", "developer", "workflow"}, } def _extract_content(text: str) -> str: """Normalize and extract analyzable content from prompt.""" if not text: return "" # Collapse whitespace, strip return " ".join(text.split()).strip() def analyze_prompt(prompt: str) -> PromptAnalysis: """ Analyze prompt using pattern matching and keyword extraction. No external APIs; pure symbolic reasoning. """ content = _extract_content(prompt).lower() words = set(re.findall(r"\b[a-z0-9]{2,}\b", content)) analysis = PromptAnalysis(keywords=words) # Intent: question vs statement vs request if "?" in prompt: analysis.intent = "question" # Extract explicit questions q_parts = re.split(r"\?+", prompt) for part in q_parts[:-1]: q = part.strip() if len(q) > 10: analysis.questions.append(q + "?") elif any(w in content for w in ["how", "what", "why", "when", "where", "who"]): analysis.intent = "question" else: analysis.intent = "statement" if len(prompt.split()) > 5 else "request" # Entity-like phrases (title case or quoted) entities = re.findall(r'"([^"]+)"', prompt) entities += re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", prompt) analysis.entities = list(dict.fromkeys(e for e in entities if len(e) > 2))[:10] # Constraint signals constraint_patterns = [ r"must\s+(\w[\w\s]+?)(?:\.|$)", r"should\s+(\w[\w\s]+?)(?:\.|$)", r"cannot\s+(\w[\w\s]+?)(?:\.|$)", r"require[sd]?\s+(\w[\w\s]+?)(?:\.|$)", r"constraint[s]?:\s*(\w[\w\s]+?)(?:\.|$)", ] for pat in constraint_patterns: for m in re.finditer(pat, prompt, re.I): analysis.constraints.append(m.group(1).strip()) # Domain relevance per head for hid, keywords in HEAD_DOMAIN_KEYWORDS.items(): if hid == HeadId.WITNESS: continue overlap = len(words & keywords) / max(len(keywords), 1) analysis.domain_signals[hid.value] = min(1.0, overlap * 3) return analysis def _derive_claims_for_head( head_id: HeadId, analysis: PromptAnalysis, prompt: str, semantic_facts: list[dict[str, Any]] | None = None, ) -> list[HeadClaim]: """Derive atomic claims from analysis based on head domain.""" claims: list[HeadClaim] = [] persona = get_persona(head_id) relevance = analysis.domain_signals.get(head_id.value, 0.3) # Base claim from prompt summary summary_claim = f"The prompt addresses: {analysis.intent}" if analysis.entities: summary_claim += f" involving {', '.join(analysis.entities[:3])}" claims.append( HeadClaim( claim_text=summary_claim, confidence=0.7 + relevance * 0.2, evidence=[Citation(source_id="prompt_analysis", excerpt=prompt[:200], confidence=1.0)], assumptions=[], ) ) # Domain-specific claims if head_id == HeadId.LOGIC: claims.append( HeadClaim( claim_text="Logical consistency should be verified for any derived conclusions.", confidence=0.8, evidence=[], assumptions=["Formal reasoning applies"], ) ) elif head_id == HeadId.SECURITY: if analysis.domain_signals.get(HeadId.SECURITY.value, 0) > 0.2: claims.append( HeadClaim( claim_text="Security implications should be explicitly evaluated.", confidence=0.85, evidence=[], assumptions=[], ) ) elif head_id == HeadId.SAFETY: claims.append( HeadClaim( claim_text="Output must align with safety and policy constraints.", confidence=0.9, evidence=[], assumptions=[], ) ) elif head_id == HeadId.STRATEGY and analysis.constraints: claims.append( HeadClaim( claim_text=f"Constraints identified: {'; '.join(analysis.constraints[:2])}.", confidence=0.75, evidence=[], assumptions=[], ) ) # Memory-augmented: add claims from semantic facts when available if semantic_facts: for fact in semantic_facts[:3]: stmt = fact.get("statement", "") if stmt: claims.append( HeadClaim( claim_text=stmt[:200], confidence=0.6, evidence=[ Citation( source_id=fact.get("source", "semantic_memory"), excerpt=stmt[:100], confidence=0.8, ) ], assumptions=["Stored fact; verify currency"], ) ) return claims def _derive_risks_for_head(head_id: HeadId, analysis: PromptAnalysis) -> list[HeadRisk]: """Identify risks based on head domain and analysis.""" risks: list[HeadRisk] = [] relevance = analysis.domain_signals.get(head_id.value, 0.3) if relevance < 0.2: risks.append( HeadRisk( description="Low domain relevance; analysis may be shallow for this head.", severity="low", ) ) if head_id == HeadId.SECURITY and relevance > 0.3: risks.append( HeadRisk( description="Security-sensitive topic; require explicit threat assessment.", severity="high", ) ) if head_id == HeadId.SAFETY: risks.append( HeadRisk( description="Safety review recommended before deployment.", severity="medium", ) ) return risks def _synthesize_summary(head_id: HeadId, analysis: PromptAnalysis, claims: list[HeadClaim]) -> str: """Synthesize persona-appropriate summary from claims and analysis.""" persona = get_persona(head_id) tone = persona.get("tone", "balanced") expression = persona.get("expression", "neutral") head_name = head_id.value.replace("_", " ").title() parts: list[str] = [] if claims: primary = claims[0].claim_text parts.append(f"From {expression} perspective: {primary[:120]}{'...' if len(primary) > 120 else ''}.") if analysis.questions: parts.append(f"Addresses {len(analysis.questions)} explicit question(s).") if analysis.constraints: parts.append(f"Constraints noted: {len(analysis.constraints)}.") if not parts: parts.append(f"{head_name} head analysis: prompt analyzed with {tone} assessment.") return " ".join(parts) def produce_head_output( head_id: HeadId, prompt: str, semantic_facts: list[dict[str, Any]] | None = None, ) -> HeadOutput: """ Produce structured HeadOutput using native reasoning only. No external LLM calls. Uses symbolic analysis, domain logic, and persona-driven synthesis. """ if head_id == HeadId.WITNESS: raise ValueError("Witness does not produce HeadOutput; use WitnessAgent") analysis = analyze_prompt(prompt) claims = _derive_claims_for_head(head_id, analysis, prompt, semantic_facts) risks = _derive_risks_for_head(head_id, analysis) summary = _synthesize_summary(head_id, analysis, claims) # Recommended actions from analysis actions: list[str] = [] if analysis.questions: actions.append("Address each explicit question in the response.") if analysis.constraints: actions.append("Verify output satisfies stated constraints.") if head_id in (HeadId.SECURITY, HeadId.SAFETY): actions.append("Perform domain-specific review before finalizing.") return HeadOutput( head_id=head_id, summary=summary, claims=claims, risks=risks, questions=analysis.questions[:3] if analysis.questions else [], recommended_actions=actions[:5] or ["Proceed with synthesis."], tone_guidance=get_persona(head_id).get("tone", "balanced"), ) def _domain_for_head(head_id: HeadId) -> str: """Map head to semantic memory domain.""" return head_id.value class NativeReasoningProvider: """ Provider for native reasoning: produces HeadOutput without external APIs. Optional memory integration: when semantic_memory is provided, retrieves relevant facts to ground claims. When episodic_memory is provided, can reference similar past outcomes (future: full retrieval). """ def __init__( self, semantic_memory: "SemanticMemory | None" = None, episodic_memory: "EpisodicMemory | None" = None, ) -> None: self._semantic = semantic_memory self._episodic = episodic_memory def produce_head_output(self, head_id: HeadId, prompt: str) -> HeadOutput: """Produce HeadOutput for the given head and prompt.""" return produce_head_output( head_id, prompt, semantic_facts=self._get_relevant_facts(head_id) if self._semantic else None, ) def _get_relevant_facts(self, head_id: HeadId, limit: int = 5) -> list[dict[str, Any]]: """Retrieve domain-relevant facts from semantic memory.""" if not self._semantic: return [] domain = _domain_for_head(head_id) return self._semantic.query(domain=domain, limit=limit)