#!/usr/bin/env python3 """ CCIP Monitor Service Monitors Chainlink CCIP message flow, tracks latency, fees, and alerts on failures. """ import os import time import json import logging from typing import Optional, Dict, Any from http.server import HTTPServer, BaseHTTPRequestHandler from threading import Thread try: from web3 import Web3 from prometheus_client import Counter, Gauge, Histogram, start_http_server except ImportError as e: print(f"Error importing dependencies: {e}") print("Please install dependencies: pip install web3 prometheus-client") exit(1) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ccip-monitor') # Load configuration from environment RPC_URL = os.getenv('RPC_URL_138', 'http://192.168.11.250:8545') CCIP_ROUTER_ADDRESS = os.getenv('CCIP_ROUTER_ADDRESS', '') CCIP_SENDER_ADDRESS = os.getenv('CCIP_SENDER_ADDRESS', '') LINK_TOKEN_ADDRESS = os.getenv('LINK_TOKEN_ADDRESS', '') METRICS_PORT = int(os.getenv('METRICS_PORT', '8000')) CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '60')) ALERT_WEBHOOK = os.getenv('ALERT_WEBHOOK', '') # Prometheus metrics ccip_messages_total = Counter('ccip_messages_total', 'Total CCIP messages processed', ['event']) ccip_message_fees = Histogram('ccip_message_fees', 'CCIP message fees', buckets=[0.001, 0.01, 0.1, 1, 10, 100]) ccip_message_latency = Histogram('ccip_message_latency_seconds', 'CCIP message latency in seconds', buckets=[1, 5, 10, 30, 60, 300, 600]) ccip_last_block = Gauge('ccip_last_block', 'Last processed block number') ccip_service_status = Gauge('ccip_service_status', 'Service status (1=healthy, 0=unhealthy)') ccip_rpc_connected = Gauge('ccip_rpc_connected', 'RPC connection status (1=connected, 0=disconnected)') # Initialize Web3 w3 = None last_processed_block = 0 # CCIP Router ABI (minimal - for event monitoring) CCIP_ROUTER_ABI = [ { "anonymous": False, "inputs": [ {"indexed": True, "name": "messageId", "type": "bytes32"}, {"indexed": True, "name": "sourceChainSelector", "type": "uint64"}, {"indexed": False, "name": "sender", "type": "address"}, {"indexed": False, "name": "data", "type": "bytes"}, {"indexed": False, "name": "tokenAmounts", "type": "tuple[]"}, {"indexed": False, "name": "feeToken", "type": "address"}, {"indexed": False, "name": "extraArgs", "type": "bytes"} ], "name": "MessageSent", "type": "event" }, { "anonymous": False, "inputs": [ {"indexed": True, "name": "messageId", "type": "bytes32"}, {"indexed": True, "name": "sourceChainSelector", "type": "uint64"}, {"indexed": False, "name": "sender", "type": "address"}, {"indexed": False, "name": "data", "type": "bytes"} ], "name": "MessageExecuted", "type": "event" } ] class HealthCheckHandler(BaseHTTPRequestHandler): """HTTP handler for health check endpoint""" def do_GET(self): global w3, ccip_service_status, ccip_rpc_connected try: # Check RPC connection if w3 and w3.is_connected(): block_number = w3.eth.block_number ccip_rpc_connected.set(1) ccip_service_status.set(1) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() response = { 'status': 'healthy', 'rpc_connected': True, 'block_number': block_number, 'ccip_router': CCIP_ROUTER_ADDRESS, 'ccip_sender': CCIP_SENDER_ADDRESS } self.wfile.write(json.dumps(response).encode()) else: ccip_rpc_connected.set(0) ccip_service_status.set(0) self.send_response(503) self.send_header('Content-type', 'application/json') self.end_headers() response = {'status': 'unhealthy', 'rpc_connected': False} self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"Health check error: {e}") ccip_service_status.set(0) self.send_response(503) self.send_header('Content-type', 'application/json') self.end_headers() response = {'status': 'error', 'error': str(e)} self.wfile.write(json.dumps(response).encode()) def log_message(self, format, *args): # Suppress default logging for health checks pass def init_web3() -> bool: """Initialize Web3 connection""" global w3 try: logger.info(f"Connecting to RPC: {RPC_URL}") w3 = Web3(Web3.HTTPProvider(RPC_URL, request_kwargs={'timeout': 30})) if not w3.is_connected(): logger.error(f"Failed to connect to RPC: {RPC_URL}") return False chain_id = w3.eth.chain_id block_number = w3.eth.block_number logger.info(f"Connected to chain {chain_id}, current block: {block_number}") return True except Exception as e: logger.error(f"Error initializing Web3: {e}") return False def monitor_ccip_events(): """Monitor CCIP Router events""" global w3, last_processed_block if not w3 or not w3.is_connected(): logger.warning("Web3 not connected, skipping event monitoring") return try: # Get current block current_block = w3.eth.block_number ccip_last_block.set(current_block) if CCIP_ROUTER_ADDRESS and CCIP_ROUTER_ADDRESS != '': try: # Determine block range (check last 100 blocks or since last processed) from_block = max(last_processed_block + 1 if last_processed_block > 0 else current_block - 100, current_block - 1000) to_block = current_block if from_block <= to_block: logger.debug(f"Checking blocks {from_block} to {to_block} for CCIP events") # Get MessageSent events using raw get_logs (web3.py 7.x compatible) try: # Calculate event signature hash for MessageSent # MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes) message_sent_signature = "MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes)" message_sent_topic = Web3.keccak(text=message_sent_signature) # Use get_logs with proper parameters for web3.py 7.x filter_params = { "fromBlock": from_block, "toBlock": to_block, "address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS), "topics": [message_sent_topic.hex()] if hasattr(message_sent_topic, 'hex') else [message_sent_topic] } logs = w3.eth.get_logs(filter_params) for log in logs: # Handle transaction hash extraction safely tx_hash = log.get('transactionHash') if tx_hash: if isinstance(tx_hash, bytes): tx_hash_str = tx_hash.hex() elif hasattr(tx_hash, 'hex'): tx_hash_str = tx_hash.hex() else: tx_hash_str = str(tx_hash) logger.info(f"CCIP MessageSent event detected: {tx_hash_str}") ccip_messages_total.labels(event='MessageSent').inc() except Exception as e: logger.debug(f"No MessageSent events or error: {e}") # Get MessageExecuted events using raw get_logs try: # MessageExecuted(bytes32,uint64,address,bytes) message_executed_signature = "MessageExecuted(bytes32,uint64,address,bytes)" message_executed_topic = Web3.keccak(text=message_executed_signature) filter_params = { "fromBlock": from_block, "toBlock": to_block, "address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS), "topics": [message_executed_topic.hex()] if hasattr(message_executed_topic, 'hex') else [message_executed_topic] } logs = w3.eth.get_logs(filter_params) for log in logs: # Handle transaction hash extraction safely tx_hash = log.get('transactionHash') if tx_hash: if isinstance(tx_hash, bytes): tx_hash_str = tx_hash.hex() elif hasattr(tx_hash, 'hex'): tx_hash_str = tx_hash.hex() else: tx_hash_str = str(tx_hash) logger.info(f"CCIP MessageExecuted event detected: {tx_hash_str}") ccip_messages_total.labels(event='MessageExecuted').inc() except Exception as e: logger.debug(f"No MessageExecuted events or error: {e}") last_processed_block = to_block except Exception as e: logger.error(f"Error monitoring CCIP events: {e}") else: logger.warning("CCIP_ROUTER_ADDRESS not configured, skipping event monitoring") except Exception as e: logger.error(f"Error in monitor_ccip_events: {e}") def start_health_server(): """Start HTTP server for health checks""" try: server = HTTPServer(('0.0.0.0', METRICS_PORT), HealthCheckHandler) logger.info(f"Health check server started on port {METRICS_PORT}") server.serve_forever() except Exception as e: logger.error(f"Error starting health server: {e}") def main(): """Main function""" logger.info("Starting CCIP Monitor Service") logger.info(f"RPC URL: {RPC_URL}") logger.info(f"CCIP Router: {CCIP_ROUTER_ADDRESS}") logger.info(f"CCIP Sender: {CCIP_SENDER_ADDRESS}") logger.info(f"Metrics Port: {METRICS_PORT}") logger.info(f"Check Interval: {CHECK_INTERVAL} seconds") # Initialize Web3 if not init_web3(): logger.error("Failed to initialize Web3, exiting") exit(1) # Start Prometheus metrics server try: start_http_server(METRICS_PORT + 1) logger.info(f"Prometheus metrics server started on port {METRICS_PORT + 1}") except Exception as e: logger.warning(f"Could not start Prometheus metrics server: {e}") # Start health check server in separate thread health_thread = Thread(target=start_health_server, daemon=True) health_thread.start() # Main monitoring loop logger.info("Starting monitoring loop") while True: try: # Check Web3 connection if not w3.is_connected(): logger.warning("Web3 connection lost, attempting to reconnect...") if not init_web3(): logger.error("Failed to reconnect to RPC") ccip_rpc_connected.set(0) time.sleep(30) continue ccip_rpc_connected.set(1) # Monitor CCIP events monitor_ccip_events() # Sleep until next check time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down...") break except Exception as e: logger.error(f"Error in main loop: {e}", exc_info=True) time.sleep(30) if __name__ == '__main__': main()