29 KiB
29 KiB
Performance Optimization Guide
This guide provides comprehensive strategies for optimizing NowYouSeeMe performance across all system components. Follow these guidelines to achieve the best possible performance for your specific use case.
🎯 Performance Targets
Real-time Requirements
| Metric | Target | Acceptable Range | Critical |
|---|---|---|---|
| Latency | <20ms | 15-25ms | >30ms |
| Accuracy | <10cm | 8-15cm | >20cm |
| Frame Rate | 30-60 FPS | 25-60 FPS | <20 FPS |
| CSI Rate | ≥100 pkt/s | 80-120 pkt/s | <50 pkt/s |
Resource Utilization
| Component | CPU Target | GPU Target | Memory Target |
|---|---|---|---|
| Camera Capture | <10% | N/A | <500MB |
| CSI Processing | <15% | N/A | <1GB |
| Vision SLAM | <40% | <60% | <2GB |
| RF SLAM | <20% | N/A | <1GB |
| Sensor Fusion | <15% | <20% | <1GB |
| Rendering | <10% | <80% | <2GB |
🔧 Hardware Optimization
GPU Configuration
NVIDIA GPU Setup
# Check GPU status
nvidia-smi
# Set GPU power management
sudo nvidia-smi -pm 1
# Set GPU memory allocation
export CUDA_VISIBLE_DEVICES=0
export CUDA_MEMORY_FRACTION=0.8
# Optimize GPU settings
nvidia-settings --assign GPUPowerMizerMode=1
GPU Memory Optimization
# In your application
import torch
import cupy as cp
# Set memory fraction
torch.cuda.set_per_process_memory_fraction(0.8)
# Clear cache periodically
torch.cuda.empty_cache()
cp.get_default_memory_pool().free_all_blocks()
CPU Optimization
Multi-threading Configuration
# Configure thread pools
import multiprocessing as mp
# Set optimal thread count
optimal_threads = min(mp.cpu_count(), 8)
mp.set_start_method('spawn', force=True)
# Configure OpenMP
import os
os.environ['OMP_NUM_THREADS'] = str(optimal_threads)
os.environ['MKL_NUM_THREADS'] = str(optimal_threads)
CPU Affinity
# Set CPU affinity for critical processes
sudo taskset -cp 0-3 <process_id>
# Or in Python
import os
os.sched_setaffinity(0, {0, 1, 2, 3})
Memory Optimization
Memory Management
# Monitor memory usage
import psutil
import gc
def optimize_memory():
"""Optimize memory usage"""
# Force garbage collection
gc.collect()
# Clear caches
torch.cuda.empty_cache()
# Monitor memory
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Memory usage: {memory_mb:.1f} MB")
Memory Pooling
# Use memory pools for frequent allocations
import numpy as np
from memory_profiler import profile
class MemoryPool:
def __init__(self, size=1000):
self.pool = []
self.size = size
def get_array(self, shape, dtype=np.float32):
if self.pool:
return self.pool.pop().reshape(shape)
return np.zeros(shape, dtype=dtype)
def return_array(self, array):
if len(self.pool) < self.size:
self.pool.append(array.flatten())
📊 Performance Monitoring
Real-time Monitoring
import time
import threading
from collections import deque
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'latency': deque(maxlen=100),
'fps': deque(maxlen=100),
'accuracy': deque(maxlen=100),
'cpu_usage': deque(maxlen=100),
'gpu_usage': deque(maxlen=100),
'memory_usage': deque(maxlen=100)
}
self.running = False
self.monitor_thread = None
def start_monitoring(self):
"""Start performance monitoring"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop performance monitoring"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""Main monitoring loop"""
while self.running:
# Collect metrics
self._collect_metrics()
time.sleep(0.1) # 10Hz monitoring
def _collect_metrics(self):
"""Collect current performance metrics"""
# CPU usage
cpu_percent = psutil.cpu_percent()
self.metrics['cpu_usage'].append(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.metrics['memory_usage'].append(memory.percent)
# GPU usage (if available)
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
gpu_util = pynvml.nvmlDeviceGetUtilizationRates(handle)
self.metrics['gpu_usage'].append(gpu_util.gpu)
except:
self.metrics['gpu_usage'].append(0)
def get_average_metrics(self):
"""Get average metrics over the last 100 samples"""
return {
metric: sum(values) / len(values) if values else 0
for metric, values in self.metrics.items()
}
def get_performance_report(self):
"""Generate performance report"""
avg_metrics = self.get_average_metrics()
report = {
'status': 'optimal' if self._check_targets(avg_metrics) else 'needs_optimization',
'metrics': avg_metrics,
'recommendations': self._generate_recommendations(avg_metrics)
}
return report
def _check_targets(self, metrics):
"""Check if metrics meet targets"""
return (
metrics.get('latency', 0) < 20 and
metrics.get('fps', 0) > 30 and
metrics.get('accuracy', 0) < 10
)
def _generate_recommendations(self, metrics):
"""Generate optimization recommendations"""
recommendations = []
if metrics.get('latency', 0) > 20:
recommendations.append("High latency detected - consider reducing processing load")
if metrics.get('fps', 0) < 30:
recommendations.append("Low frame rate - check GPU utilization and rendering settings")
if metrics.get('cpu_usage', 0) > 80:
recommendations.append("High CPU usage - consider reducing thread count or processing quality")
if metrics.get('memory_usage', 0) > 80:
recommendations.append("High memory usage - consider clearing caches or reducing buffer sizes")
return recommendations
Profiling Tools
CPU Profiling
import cProfile
import pstats
import io
def profile_function(func, *args, **kwargs):
"""Profile a function's performance"""
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20)
print(s.getvalue())
return result
Memory Profiling
from memory_profiler import profile
@profile
def memory_intensive_function():
"""Function to profile memory usage"""
# Your memory-intensive code here
pass
GPU Profiling
import torch
def profile_gpu_operations():
"""Profile GPU operations"""
with torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
record_shapes=True,
with_stack=True
) as prof:
# Your GPU operations here
pass
print(prof.key_averages().table(sort_by="cuda_time_total"))
⚡ Algorithm Optimization
Vision SLAM Optimization
Feature Detection Optimization
import cv2
import numpy as np
class OptimizedFeatureDetector:
def __init__(self, max_features=1000, quality_level=0.01):
self.max_features = max_features
self.quality_level = quality_level
self.detector = cv2.FastFeatureDetector_create(
threshold=10,
nonmaxSuppression=True
)
def detect_features(self, image):
"""Optimized feature detection"""
# Convert to grayscale if needed
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Detect features
keypoints = self.detector.detect(gray)
# Limit number of features
if len(keypoints) > self.max_features:
keypoints = sorted(keypoints, key=lambda x: x.response, reverse=True)
keypoints = keypoints[:self.max_features]
return keypoints
Tracking Optimization
class OptimizedTracker:
def __init__(self):
self.prev_frame = None
self.prev_keypoints = None
self.prev_descriptors = None
def track_features(self, frame, keypoints, descriptors):
"""Optimized feature tracking"""
if self.prev_frame is None:
self.prev_frame = frame
self.prev_keypoints = keypoints
self.prev_descriptors = descriptors
return keypoints, descriptors
# Use optical flow for fast tracking
if len(self.prev_keypoints) > 0:
prev_pts = np.float32([kp.pt for kp in self.prev_keypoints]).reshape(-1, 1, 2)
curr_pts, status, error = cv2.calcOpticalFlowPyrLK(
self.prev_frame, frame, prev_pts, None
)
# Filter good matches
good_old = self.prev_keypoints[status.ravel() == 1]
good_new = keypoints[status.ravel() == 1]
# Update tracking state
self.prev_frame = frame
self.prev_keypoints = good_new
self.prev_descriptors = descriptors[status.ravel() == 1]
return good_new, self.prev_descriptors
return keypoints, descriptors
RF SLAM Optimization
CSI Processing Optimization
import numpy as np
from scipy import signal
class OptimizedCSIProcessor:
def __init__(self, sample_rate=1000, window_size=64):
self.sample_rate = sample_rate
self.window_size = window_size
self.window = signal.windows.hann(window_size)
def process_csi_packet(self, csi_data):
"""Optimized CSI packet processing"""
# Apply window function
windowed_data = csi_data * self.window
# FFT with optimized size
fft_size = 2**int(np.log2(len(windowed_data)))
spectrum = np.fft.fft(windowed_data, fft_size)
# Extract relevant frequency bins
relevant_bins = spectrum[:fft_size//2]
return relevant_bins
def estimate_aoa(self, csi_packets):
"""Optimized AoA estimation"""
# Process multiple packets
processed_packets = [self.process_csi_packet(packet) for packet in csi_packets]
# Use MUSIC algorithm for AoA estimation
# (Simplified implementation)
correlation_matrix = np.corrcoef(processed_packets)
eigenvalues, eigenvectors = np.linalg.eigh(correlation_matrix)
# Estimate AoA from eigenstructure
noise_subspace = eigenvectors[:, :-3] # Assume 3 sources
aoa_spectrum = self._music_spectrum(noise_subspace)
return np.argmax(aoa_spectrum)
def _music_spectrum(self, noise_subspace):
"""MUSIC algorithm spectrum"""
# Simplified MUSIC implementation
angles = np.linspace(-np.pi/2, np.pi/2, 180)
spectrum = np.zeros(len(angles))
for i, angle in enumerate(angles):
steering_vector = np.exp(1j * 2 * np.pi * np.arange(4) * np.sin(angle))
spectrum[i] = 1 / (steering_vector.conj() @ noise_subspace @ noise_subspace.conj().T @ steering_vector)
return spectrum
Sensor Fusion Optimization
EKF Optimization
import numpy as np
from scipy.linalg import solve_discrete_lyapunov
class OptimizedEKF:
def __init__(self, state_dim=6, measurement_dim=3):
self.state_dim = state_dim
self.measurement_dim = measurement_dim
# Initialize state and covariance
self.x = np.zeros(state_dim)
self.P = np.eye(state_dim) * 0.1
# Process and measurement noise
self.Q = np.eye(state_dim) * 0.01
self.R = np.eye(measurement_dim) * 0.1
def predict(self, dt):
"""Optimized prediction step"""
# State transition matrix (constant velocity model)
F = np.eye(self.state_dim)
F[:3, 3:6] = np.eye(3) * dt
# Predict state
self.x = F @ self.x
# Predict covariance
self.P = F @ self.P @ F.T + self.Q
def update(self, measurement):
"""Optimized update step"""
# Measurement matrix
H = np.zeros((self.measurement_dim, self.state_dim))
H[:3, :3] = np.eye(3)
# Kalman gain
S = H @ self.P @ H.T + self.R
K = self.P @ H.T @ np.linalg.inv(S)
# Update state and covariance
y = measurement - H @ self.x
self.x = self.x + K @ y
self.P = (np.eye(self.state_dim) - K @ H) @ self.P
def get_pose(self):
"""Get current pose estimate"""
return {
'position': self.x[:3],
'velocity': self.x[3:6],
'covariance': self.P
}
🎨 Rendering Optimization
OpenGL Optimization
import OpenGL.GL as gl
import numpy as np
class OptimizedRenderer:
def __init__(self):
self.shader_program = None
self.vao = None
self.vbo = None
self.ebo = None
self.setup_gl()
def setup_gl(self):
"""Setup OpenGL for optimal performance"""
# Enable optimizations
gl.glEnable(gl.GL_DEPTH_TEST)
gl.glEnable(gl.GL_CULL_FACE)
gl.glEnable(gl.GL_BLEND)
gl.glBlendFunc(gl.GL_SRC_ALPHA, gl.GL_ONE_MINUS_SRC_ALPHA)
# Set clear color
gl.glClearColor(0.1, 0.1, 0.1, 1.0)
def create_shader_program(self, vertex_source, fragment_source):
"""Create optimized shader program"""
vertex_shader = gl.glCreateShader(gl.GL_VERTEX_SHADER)
gl.glShaderSource(vertex_shader, vertex_source)
gl.glCompileShader(vertex_shader)
fragment_shader = gl.glCreateShader(gl.GL_FRAGMENT_SHADER)
gl.glShaderSource(fragment_shader, fragment_source)
gl.glCompileShader(fragment_shader)
program = gl.glCreateProgram()
gl.glAttachShader(program, vertex_shader)
gl.glAttachShader(program, fragment_shader)
gl.glLinkProgram(program)
# Clean up shaders
gl.glDeleteShader(vertex_shader)
gl.glDeleteShader(fragment_shader)
return program
def setup_buffers(self, vertices, indices):
"""Setup optimized vertex buffers"""
# Create VAO
self.vao = gl.glGenVertexArrays(1)
gl.glBindVertexArray(self.vao)
# Create VBO
self.vbo = gl.glGenBuffers(1)
gl.glBindBuffer(gl.GL_ARRAY_BUFFER, self.vbo)
gl.glBufferData(gl.GL_ARRAY_BUFFER, vertices.nbytes, vertices, gl.GL_STATIC_DRAW)
# Create EBO
self.ebo = gl.glGenBuffers(1)
gl.glBindBuffer(gl.GL_ELEMENT_ARRAY_BUFFER, self.ebo)
gl.glBufferData(gl.GL_ELEMENT_ARRAY_BUFFER, indices.nbytes, indices, gl.GL_STATIC_DRAW)
# Set vertex attributes
gl.glVertexAttribPointer(0, 3, gl.GL_FLOAT, gl.GL_FALSE, 24, None)
gl.glEnableVertexAttribArray(0)
gl.glVertexAttribPointer(1, 3, gl.GL_FLOAT, gl.GL_FALSE, 24, ctypes.c_void_p(12))
gl.glEnableVertexAttribArray(1)
def render_frame(self, pose_data):
"""Optimized frame rendering"""
# Clear buffers
gl.glClear(gl.GL_COLOR_BUFFER_BIT | gl.GL_DEPTH_BUFFER_BIT)
# Use shader program
gl.glUseProgram(self.shader_program)
# Update uniform matrices
self.update_matrices(pose_data)
# Bind VAO and draw
gl.glBindVertexArray(self.vao)
gl.glDrawElements(gl.GL_TRIANGLES, self.index_count, gl.GL_UNSIGNED_INT, None)
def update_matrices(self, pose_data):
"""Update transformation matrices"""
# Calculate view and projection matrices
view_matrix = self.calculate_view_matrix(pose_data)
projection_matrix = self.calculate_projection_matrix()
# Upload to GPU
gl.glUniformMatrix4fv(self.view_location, 1, gl.GL_FALSE, view_matrix)
gl.glUniformMatrix4fv(self.projection_location, 1, gl.GL_FALSE, projection_matrix)
NeRF Rendering Optimization
import torch
import torch.nn as nn
class OptimizedNeRFRenderer:
def __init__(self, model_path, device='cuda'):
self.device = device
self.model = self.load_model(model_path)
self.model.to(device)
self.model.eval()
# Optimization settings
self.chunk_size = 4096
self.num_samples = 64
def load_model(self, model_path):
"""Load optimized NeRF model"""
# Load pre-trained model
model = torch.load(model_path, map_location=self.device)
return model
@torch.no_grad()
def render_rays(self, rays_o, rays_d, near, far):
"""Optimized ray rendering"""
# Process rays in chunks
outputs = []
for i in range(0, rays_o.shape[0], self.chunk_size):
chunk_o = rays_o[i:i+self.chunk_size]
chunk_d = rays_d[i:i+self.chunk_size]
# Render chunk
chunk_output = self._render_chunk(chunk_o, chunk_d, near, far)
outputs.append(chunk_output)
# Combine outputs
return torch.cat(outputs, dim=0)
def _render_chunk(self, rays_o, rays_d, near, far):
"""Render a chunk of rays"""
# Sample points along rays
t_vals = torch.linspace(0., 1., self.num_samples, device=self.device)
z_vals = near * (1. - t_vals) + far * t_vals
# Expand dimensions
z_vals = z_vals.unsqueeze(0).expand(rays_o.shape[0], -1)
# Sample points
pts = rays_o.unsqueeze(1) + rays_d.unsqueeze(1) * z_vals.unsqueeze(-1)
# Query network
rgb, sigma = self.model(pts, rays_d)
# Volume rendering
rgb_final = self._volume_render(rgb, sigma, z_vals)
return rgb_final
def _volume_render(self, rgb, sigma, z_vals):
"""Volume rendering integration"""
# Calculate distances
dists = z_vals[..., 1:] - z_vals[..., :-1]
dists = torch.cat([dists, torch.tensor([1e10], device=self.device).expand(dists[..., :1].shape)], -1)
# Calculate alpha
alpha = 1. - torch.exp(-sigma * dists)
# Calculate weights
weights = alpha * torch.cumprod(torch.cat([torch.ones((alpha.shape[0], 1), device=self.device), 1.-alpha + 1e-10], -1), -1)[:, :-1]
# Integrate
rgb_final = torch.sum(weights.unsqueeze(-1) * rgb, -2)
return rgb_final
🔧 Configuration Optimization
Performance Configuration
{
"performance": {
"target_latency": 20,
"target_fps": 30,
"target_accuracy": 10,
"max_cpu_usage": 80,
"max_gpu_usage": 90,
"max_memory_usage": 80
},
"processing": {
"vision_slam": {
"max_features": 1000,
"min_features": 100,
"update_rate": 30,
"quality_level": 0.01
},
"rf_slam": {
"packet_rate": 100,
"aoa_estimation": "music",
"filter_window": 10
},
"sensor_fusion": {
"fusion_method": "ekf",
"vision_weight": 0.7,
"rf_weight": 0.3,
"process_noise": 0.01
}
},
"rendering": {
"quality": "high",
"resolution": [1280, 720],
"vsync": true,
"antialiasing": true,
"shadow_quality": "medium"
},
"optimization": {
"use_gpu": true,
"use_multithreading": true,
"memory_pooling": true,
"chunk_processing": true
}
}
Dynamic Configuration
class DynamicConfigManager:
def __init__(self, base_config):
self.base_config = base_config
self.current_config = base_config.copy()
self.performance_monitor = PerformanceMonitor()
def optimize_config(self):
"""Dynamically optimize configuration based on performance"""
metrics = self.performance_monitor.get_average_metrics()
# Adjust based on latency
if metrics.get('latency', 0) > 25:
self._reduce_processing_load()
# Adjust based on frame rate
if metrics.get('fps', 0) < 25:
self._reduce_rendering_quality()
# Adjust based on CPU usage
if metrics.get('cpu_usage', 0) > 85:
self._reduce_thread_count()
# Adjust based on memory usage
if metrics.get('memory_usage', 0) > 85:
self._reduce_buffer_sizes()
def _reduce_processing_load(self):
"""Reduce processing load"""
self.current_config['processing']['vision_slam']['max_features'] = max(
500, self.current_config['processing']['vision_slam']['max_features'] - 100
)
self.current_config['processing']['vision_slam']['update_rate'] = max(
20, self.current_config['processing']['vision_slam']['update_rate'] - 5
)
def _reduce_rendering_quality(self):
"""Reduce rendering quality"""
quality_levels = ['high', 'medium', 'low']
current_quality = self.current_config['rendering']['quality']
current_index = quality_levels.index(current_quality)
if current_index < len(quality_levels) - 1:
self.current_config['rendering']['quality'] = quality_levels[current_index + 1]
def _reduce_thread_count(self):
"""Reduce thread count"""
# Implementation for reducing thread count
pass
def _reduce_buffer_sizes(self):
"""Reduce buffer sizes"""
# Implementation for reducing buffer sizes
pass
📊 Performance Testing
Benchmark Suite
import time
import statistics
class PerformanceBenchmark:
def __init__(self):
self.results = {}
def benchmark_latency(self, func, *args, iterations=100):
"""Benchmark function latency"""
times = []
for _ in range(iterations):
start_time = time.perf_counter()
func(*args)
end_time = time.perf_counter()
times.append((end_time - start_time) * 1000) # Convert to ms
return {
'mean': statistics.mean(times),
'median': statistics.median(times),
'std': statistics.stdev(times),
'min': min(times),
'max': max(times),
'p95': statistics.quantiles(times, n=20)[18], # 95th percentile
'p99': statistics.quantiles(times, n=100)[98] # 99th percentile
}
def benchmark_throughput(self, func, *args, duration=10):
"""Benchmark function throughput"""
start_time = time.perf_counter()
count = 0
while time.perf_counter() - start_time < duration:
func(*args)
count += 1
return count / duration # Operations per second
def benchmark_memory(self, func, *args):
"""Benchmark memory usage"""
import psutil
import gc
# Force garbage collection
gc.collect()
# Get initial memory
process = psutil.Process()
initial_memory = process.memory_info().rss
# Run function
func(*args)
# Get final memory
final_memory = process.memory_info().rss
return final_memory - initial_memory # Memory increase in bytes
def run_full_benchmark(self):
"""Run complete performance benchmark"""
benchmark_results = {}
# Benchmark camera capture
benchmark_results['camera_capture'] = self.benchmark_latency(
self.camera_capture_test
)
# Benchmark CSI processing
benchmark_results['csi_processing'] = self.benchmark_latency(
self.csi_processing_test
)
# Benchmark SLAM processing
benchmark_results['slam_processing'] = self.benchmark_latency(
self.slam_processing_test
)
# Benchmark rendering
benchmark_results['rendering'] = self.benchmark_latency(
self.rendering_test
)
# Benchmark end-to-end
benchmark_results['end_to_end'] = self.benchmark_latency(
self.end_to_end_test
)
return benchmark_results
def generate_report(self, results):
"""Generate performance report"""
report = {
'summary': {
'total_latency': sum(r['mean'] for r in results.values()),
'bottleneck': max(results.items(), key=lambda x: x[1]['mean'])[0],
'performance_grade': self._calculate_grade(results)
},
'details': results,
'recommendations': self._generate_recommendations(results)
}
return report
def _calculate_grade(self, results):
"""Calculate overall performance grade"""
total_latency = sum(r['mean'] for r in results.values())
if total_latency < 20:
return 'A'
elif total_latency < 30:
return 'B'
elif total_latency < 40:
return 'C'
else:
return 'D'
def _generate_recommendations(self, results):
"""Generate optimization recommendations"""
recommendations = []
for component, metrics in results.items():
if metrics['mean'] > 10: # High latency threshold
recommendations.append(f"Optimize {component} - current latency: {metrics['mean']:.2f}ms")
if metrics['p99'] > metrics['mean'] * 2: # High variance
recommendations.append(f"Reduce variance in {component} - p99: {metrics['p99']:.2f}ms")
return recommendations
🚀 Deployment Optimization
Production Configuration
# docker-compose.prod.yml
version: '3.8'
services:
nowyouseeme:
build:
context: .
dockerfile: Dockerfile
target: production
container_name: nowyouseeme-prod
ports:
- "8080:8080"
volumes:
- ./config:/app/config:ro
- ./data:/app/data
- ./logs:/app/logs
environment:
- PYTHONPATH=/app/src
- NOWYOUSEE_DEBUG=0
- CUDA_VISIBLE_DEVICES=0
- OMP_NUM_THREADS=4
- MKL_NUM_THREADS=4
devices:
- /dev/video0:/dev/video0
- /dev/bus/usb:/dev/bus/usb
network_mode: host
restart: unless-stopped
deploy:
resources:
limits:
cpus: '4.0'
memory: 8G
reservations:
cpus: '2.0'
memory: 4G
healthcheck:
test: ["CMD", "python3", "-c", "import sys; sys.exit(0)"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
Monitoring Setup
# monitoring.py
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
class PerformanceMetrics:
def __init__(self):
# Define metrics
self.latency_histogram = Histogram(
'nowyouseeme_latency_seconds',
'End-to-end latency in seconds',
buckets=[0.01, 0.02, 0.03, 0.05, 0.1, 0.2, 0.5]
)
self.fps_gauge = Gauge(
'nowyouseeme_fps',
'Current frame rate'
)
self.accuracy_gauge = Gauge(
'nowyouseeme_accuracy_cm',
'Current tracking accuracy in cm'
)
self.cpu_usage_gauge = Gauge(
'nowyouseeme_cpu_usage_percent',
'CPU usage percentage'
)
self.gpu_usage_gauge = Gauge(
'nowyouseeme_gpu_usage_percent',
'GPU usage percentage'
)
self.memory_usage_gauge = Gauge(
'nowyouseeme_memory_usage_percent',
'Memory usage percentage'
)
def record_latency(self, latency_ms):
"""Record latency measurement"""
self.latency_histogram.observe(latency_ms / 1000.0)
def record_fps(self, fps):
"""Record frame rate"""
self.fps_gauge.set(fps)
def record_accuracy(self, accuracy_cm):
"""Record accuracy measurement"""
self.accuracy_gauge.set(accuracy_cm)
def record_system_metrics(self, cpu_percent, gpu_percent, memory_percent):
"""Record system metrics"""
self.cpu_usage_gauge.set(cpu_percent)
self.gpu_usage_gauge.set(gpu_percent)
self.memory_usage_gauge.set(memory_percent)
# Start metrics server
if __name__ == '__main__':
prometheus_client.start_http_server(8000)
For more detailed optimization strategies, see:
- Architecture Guide - System design and optimization
- Troubleshooting Guide - Performance issue resolution
- API Reference - Performance-related API calls