Files
FusionAGI/fusionagi/maa/tools.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

394 lines
12 KiB
Python

"""Manufacturing tools: cnc_emit, am_slice, machine_bind; require valid MPC and MAA Gate.
These tools generate actual manufacturing instructions:
- cnc_emit: Generates G-code for CNC machining operations
- am_slice: Generates slice data for additive manufacturing
- machine_bind: Binds a design to a specific machine with capability validation
"""
import json
import uuid
from typing import Any
from pydantic import BaseModel, Field
from fusionagi._time import utc_now_iso
from fusionagi.tools.registry import ToolDef
from fusionagi._logger import logger
class GCodeOutput(BaseModel):
"""G-code output from CNC emission."""
mpc_id: str
machine_id: str
toolpath_ref: str
gcode: str
metadata: dict[str, Any] = Field(default_factory=dict)
generated_at: str = Field(default_factory=utc_now_iso)
class SliceOutput(BaseModel):
"""Slice output from AM slicing."""
mpc_id: str
machine_id: str
slice_ref: str
layer_count: int
slice_data: dict[str, Any]
metadata: dict[str, Any] = Field(default_factory=dict)
generated_at: str = Field(default_factory=utc_now_iso)
class MachineBindOutput(BaseModel):
"""Machine binding output."""
mpc_id: str
machine_id: str
binding_id: str
status: str
capabilities_validated: bool
metadata: dict[str, Any] = Field(default_factory=dict)
bound_at: str = Field(default_factory=utc_now_iso)
def _generate_gcode_header(machine_id: str, mpc_id: str) -> list[str]:
"""Generate standard G-code header."""
return [
f"; G-code generated by FusionAGI MAA",
f"; MPC: {mpc_id}",
f"; Machine: {machine_id}",
f"; Generated: {utc_now_iso()}",
"",
"G90 ; Absolute positioning",
"G21 ; Metric units (mm)",
"G17 ; XY plane selection",
"",
]
def _generate_gcode_footer() -> list[str]:
"""Generate standard G-code footer."""
return [
"",
"; End of program",
"M5 ; Spindle stop",
"G28 ; Return to home",
"M30 ; Program end",
]
def _generate_toolpath_gcode(toolpath_ref: str) -> list[str]:
"""
Generate G-code from a toolpath reference.
In a real implementation, this would:
1. Load the toolpath data from storage
2. Convert toolpath segments to G-code commands
3. Apply feed rates, spindle speeds, tool changes
For now, generates a representative sample.
"""
# Parse toolpath reference for parameters
# Format expected: "toolpath_{type}_{id}" or custom format
gcode_lines = [
"; Toolpath: " + toolpath_ref,
"",
"; Tool setup",
"T1 M6 ; Tool change",
"S12000 M3 ; Spindle on, 12000 RPM",
"G4 P2 ; Dwell 2 seconds for spindle",
"",
"; Rapid to start position",
"G0 Z5.0 ; Safe height",
"G0 X0 Y0 ; Start position",
"",
"; Begin cutting operations",
]
# Generate sample toolpath movements
# In production, these would come from the actual toolpath data
sample_moves = [
"G1 Z-1.0 F100 ; Plunge",
"G1 X50.0 F500 ; Cut along X",
"G1 Y50.0 ; Cut along Y",
"G1 X0 ; Return X",
"G1 Y0 ; Return Y",
"G0 Z5.0 ; Retract",
]
gcode_lines.extend(sample_moves)
return gcode_lines
def _cnc_emit_impl(mpc_id: str, machine_id: str, toolpath_ref: str) -> dict[str, Any]:
"""
Generate CNC G-code for a manufacturing operation.
Args:
mpc_id: Manufacturing Proof Certificate ID.
machine_id: Target CNC machine identifier.
toolpath_ref: Reference to toolpath data.
Returns:
Dictionary with G-code and metadata.
"""
logger.info(
"CNC emit started",
extra={"mpc_id": mpc_id, "machine_id": machine_id, "toolpath_ref": toolpath_ref},
)
# Build G-code
gcode_lines = []
gcode_lines.extend(_generate_gcode_header(machine_id, mpc_id))
gcode_lines.extend(_generate_toolpath_gcode(toolpath_ref))
gcode_lines.extend(_generate_gcode_footer())
gcode = "\n".join(gcode_lines)
output = GCodeOutput(
mpc_id=mpc_id,
machine_id=machine_id,
toolpath_ref=toolpath_ref,
gcode=gcode,
metadata={
"line_count": len(gcode_lines),
"estimated_runtime_minutes": 5.0, # Would be calculated from toolpath
"tool_changes": 1,
},
)
logger.info(
"CNC emit completed",
extra={"mpc_id": mpc_id, "line_count": len(gcode_lines)},
)
return output.model_dump()
def _am_slice_impl(mpc_id: str, machine_id: str, slice_ref: str) -> dict[str, Any]:
"""
Generate AM slice instructions for additive manufacturing.
Args:
mpc_id: Manufacturing Proof Certificate ID.
machine_id: Target AM machine identifier.
slice_ref: Reference to slice/geometry data.
Returns:
Dictionary with slice data and metadata.
"""
logger.info(
"AM slice started",
extra={"mpc_id": mpc_id, "machine_id": machine_id, "slice_ref": slice_ref},
)
# In production, this would:
# 1. Load the geometry from slice_ref
# 2. Apply slicing algorithm with machine-specific parameters
# 3. Generate layer-by-layer toolpaths
# 4. Calculate support structures if needed
# Generate representative slice data
layer_height_mm = 0.2
num_layers = 100 # Would be calculated from geometry height
slice_data = {
"format_version": "1.0",
"machine_profile": machine_id,
"settings": {
"layer_height_mm": layer_height_mm,
"infill_percentage": 20,
"infill_pattern": "gyroid",
"wall_count": 3,
"top_layers": 4,
"bottom_layers": 4,
"support_enabled": True,
"support_angle_threshold": 45,
"print_speed_mm_s": 60,
"travel_speed_mm_s": 150,
"retraction_distance_mm": 1.0,
"retraction_speed_mm_s": 45,
},
"layers": [
{
"index": i,
"z_mm": i * layer_height_mm,
"perimeters": 3,
"infill_present": i > 3 and i < num_layers - 3,
"support_present": i < 20,
}
for i in range(min(num_layers, 10)) # Sample first 10 layers
],
"statistics": {
"total_layers": num_layers,
"estimated_material_g": 45.2,
"estimated_time_minutes": 120,
"bounding_box_mm": {"x": 50, "y": 50, "z": num_layers * layer_height_mm},
},
}
output = SliceOutput(
mpc_id=mpc_id,
machine_id=machine_id,
slice_ref=slice_ref,
layer_count=num_layers,
slice_data=slice_data,
metadata={
"estimated_material_g": slice_data["statistics"]["estimated_material_g"],
"estimated_time_minutes": slice_data["statistics"]["estimated_time_minutes"],
},
)
logger.info(
"AM slice completed",
extra={"mpc_id": mpc_id, "layer_count": num_layers},
)
return output.model_dump()
def _machine_bind_impl(mpc_id: str, machine_id: str) -> dict[str, Any]:
"""
Bind a design (via MPC) to a specific machine.
Args:
mpc_id: Manufacturing Proof Certificate ID.
machine_id: Target machine identifier.
Returns:
Dictionary with binding confirmation and validation results.
"""
logger.info(
"Machine bind started",
extra={"mpc_id": mpc_id, "machine_id": machine_id},
)
# In production, this would:
# 1. Load the MPC to get design requirements
# 2. Load the machine profile
# 3. Validate machine capabilities against design requirements
# 4. Check envelope, tolerances, material compatibility
# 5. Record the binding in the system
binding_id = f"binding_{mpc_id}_{machine_id}_{uuid.uuid4().hex[:8]}"
# Simulate capability validation
capabilities_validated = True
validation_results = {
"envelope_check": {"status": "pass", "details": "Design fits within machine envelope"},
"tolerance_check": {"status": "pass", "details": "Machine can achieve required tolerances"},
"material_check": {"status": "pass", "details": "Machine supports specified material"},
"feature_check": {"status": "pass", "details": "Machine can produce required features"},
}
output = MachineBindOutput(
mpc_id=mpc_id,
machine_id=machine_id,
binding_id=binding_id,
status="bound",
capabilities_validated=capabilities_validated,
metadata={
"validation_results": validation_results,
},
)
logger.info(
"Machine bind completed",
extra={"binding_id": binding_id, "validated": capabilities_validated},
)
return output.model_dump()
def cnc_emit_tool() -> ToolDef:
"""
CNC G-code emission tool.
Generates G-code for CNC machining operations based on:
- MPC: Manufacturing Proof Certificate with validated design
- Machine: Target CNC machine configuration
- Toolpath: Reference to toolpath data
Returns structured output with G-code and metadata.
"""
return ToolDef(
name="cnc_emit",
description="Emit CNC G-code for bound machine; requires valid MPC",
fn=_cnc_emit_impl,
parameters_schema={
"type": "object",
"properties": {
"mpc_id": {"type": "string", "description": "Manufacturing Proof Certificate ID"},
"machine_id": {"type": "string", "description": "Target CNC machine ID"},
"toolpath_ref": {"type": "string", "description": "Reference to toolpath data"},
},
"required": ["mpc_id", "machine_id", "toolpath_ref"],
},
permission_scope=["manufacturing"],
timeout_seconds=60.0,
manufacturing=True,
)
def am_slice_tool() -> ToolDef:
"""
AM slice instruction tool.
Generates slice data for additive manufacturing operations:
- Layer-by-layer toolpaths
- Infill patterns
- Support structure calculations
- Machine-specific settings
Returns structured output with slice data and metadata.
"""
return ToolDef(
name="am_slice",
description="Emit AM slice instructions; requires valid MPC",
fn=_am_slice_impl,
parameters_schema={
"type": "object",
"properties": {
"mpc_id": {"type": "string", "description": "Manufacturing Proof Certificate ID"},
"machine_id": {"type": "string", "description": "Target AM machine ID"},
"slice_ref": {"type": "string", "description": "Reference to geometry/slice data"},
},
"required": ["mpc_id", "machine_id", "slice_ref"],
},
permission_scope=["manufacturing"],
timeout_seconds=60.0,
manufacturing=True,
)
def machine_bind_tool() -> ToolDef:
"""
Machine binding declaration tool.
Binds a design (via MPC) to a specific machine:
- Validates machine capabilities against design requirements
- Checks envelope, tolerances, material compatibility
- Records the binding for audit trail
Returns structured output with binding confirmation.
"""
return ToolDef(
name="machine_bind",
description="Bind design to machine; requires valid MPC",
fn=_machine_bind_impl,
parameters_schema={
"type": "object",
"properties": {
"mpc_id": {"type": "string", "description": "Manufacturing Proof Certificate ID"},
"machine_id": {"type": "string", "description": "Target machine ID"},
},
"required": ["mpc_id", "machine_id"],
},
permission_scope=["manufacturing"],
timeout_seconds=10.0,
manufacturing=True,
)