307 lines
12 KiB
Python
307 lines
12 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
CCIP Monitor Service
|
||
|
|
Monitors Chainlink CCIP message flow, tracks latency, fees, and alerts on failures.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Optional, Dict, Any
|
||
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
|
|
from threading import Thread
|
||
|
|
|
||
|
|
try:
|
||
|
|
from web3 import Web3
|
||
|
|
from prometheus_client import Counter, Gauge, Histogram, start_http_server
|
||
|
|
except ImportError as e:
|
||
|
|
print(f"Error importing dependencies: {e}")
|
||
|
|
print("Please install dependencies: pip install web3 prometheus-client")
|
||
|
|
exit(1)
|
||
|
|
|
||
|
|
# Configure logging
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
|
|
)
|
||
|
|
logger = logging.getLogger('ccip-monitor')
|
||
|
|
|
||
|
|
# Load configuration from environment
|
||
|
|
RPC_URL = os.getenv('RPC_URL_138', 'http://192.168.11.250:8545')
|
||
|
|
CCIP_ROUTER_ADDRESS = os.getenv('CCIP_ROUTER_ADDRESS', '')
|
||
|
|
CCIP_SENDER_ADDRESS = os.getenv('CCIP_SENDER_ADDRESS', '')
|
||
|
|
LINK_TOKEN_ADDRESS = os.getenv('LINK_TOKEN_ADDRESS', '')
|
||
|
|
METRICS_PORT = int(os.getenv('METRICS_PORT', '8000'))
|
||
|
|
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '60'))
|
||
|
|
ALERT_WEBHOOK = os.getenv('ALERT_WEBHOOK', '')
|
||
|
|
|
||
|
|
# Prometheus metrics
|
||
|
|
ccip_messages_total = Counter('ccip_messages_total', 'Total CCIP messages processed', ['event'])
|
||
|
|
ccip_message_fees = Histogram('ccip_message_fees', 'CCIP message fees', buckets=[0.001, 0.01, 0.1, 1, 10, 100])
|
||
|
|
ccip_message_latency = Histogram('ccip_message_latency_seconds', 'CCIP message latency in seconds', buckets=[1, 5, 10, 30, 60, 300, 600])
|
||
|
|
ccip_last_block = Gauge('ccip_last_block', 'Last processed block number')
|
||
|
|
ccip_service_status = Gauge('ccip_service_status', 'Service status (1=healthy, 0=unhealthy)')
|
||
|
|
ccip_rpc_connected = Gauge('ccip_rpc_connected', 'RPC connection status (1=connected, 0=disconnected)')
|
||
|
|
|
||
|
|
# Initialize Web3
|
||
|
|
w3 = None
|
||
|
|
last_processed_block = 0
|
||
|
|
|
||
|
|
# CCIP Router ABI (minimal - for event monitoring)
|
||
|
|
CCIP_ROUTER_ABI = [
|
||
|
|
{
|
||
|
|
"anonymous": False,
|
||
|
|
"inputs": [
|
||
|
|
{"indexed": True, "name": "messageId", "type": "bytes32"},
|
||
|
|
{"indexed": True, "name": "sourceChainSelector", "type": "uint64"},
|
||
|
|
{"indexed": False, "name": "sender", "type": "address"},
|
||
|
|
{"indexed": False, "name": "data", "type": "bytes"},
|
||
|
|
{"indexed": False, "name": "tokenAmounts", "type": "tuple[]"},
|
||
|
|
{"indexed": False, "name": "feeToken", "type": "address"},
|
||
|
|
{"indexed": False, "name": "extraArgs", "type": "bytes"}
|
||
|
|
],
|
||
|
|
"name": "MessageSent",
|
||
|
|
"type": "event"
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"anonymous": False,
|
||
|
|
"inputs": [
|
||
|
|
{"indexed": True, "name": "messageId", "type": "bytes32"},
|
||
|
|
{"indexed": True, "name": "sourceChainSelector", "type": "uint64"},
|
||
|
|
{"indexed": False, "name": "sender", "type": "address"},
|
||
|
|
{"indexed": False, "name": "data", "type": "bytes"}
|
||
|
|
],
|
||
|
|
"name": "MessageExecuted",
|
||
|
|
"type": "event"
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
class HealthCheckHandler(BaseHTTPRequestHandler):
|
||
|
|
"""HTTP handler for health check endpoint"""
|
||
|
|
|
||
|
|
def do_GET(self):
|
||
|
|
global w3, ccip_service_status, ccip_rpc_connected
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Check RPC connection
|
||
|
|
if w3 and w3.is_connected():
|
||
|
|
block_number = w3.eth.block_number
|
||
|
|
ccip_rpc_connected.set(1)
|
||
|
|
ccip_service_status.set(1)
|
||
|
|
|
||
|
|
self.send_response(200)
|
||
|
|
self.send_header('Content-type', 'application/json')
|
||
|
|
self.end_headers()
|
||
|
|
response = {
|
||
|
|
'status': 'healthy',
|
||
|
|
'rpc_connected': True,
|
||
|
|
'block_number': block_number,
|
||
|
|
'ccip_router': CCIP_ROUTER_ADDRESS,
|
||
|
|
'ccip_sender': CCIP_SENDER_ADDRESS
|
||
|
|
}
|
||
|
|
self.wfile.write(json.dumps(response).encode())
|
||
|
|
else:
|
||
|
|
ccip_rpc_connected.set(0)
|
||
|
|
ccip_service_status.set(0)
|
||
|
|
self.send_response(503)
|
||
|
|
self.send_header('Content-type', 'application/json')
|
||
|
|
self.end_headers()
|
||
|
|
response = {'status': 'unhealthy', 'rpc_connected': False}
|
||
|
|
self.wfile.write(json.dumps(response).encode())
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Health check error: {e}")
|
||
|
|
ccip_service_status.set(0)
|
||
|
|
self.send_response(503)
|
||
|
|
self.send_header('Content-type', 'application/json')
|
||
|
|
self.end_headers()
|
||
|
|
response = {'status': 'error', 'error': str(e)}
|
||
|
|
self.wfile.write(json.dumps(response).encode())
|
||
|
|
|
||
|
|
def log_message(self, format, *args):
|
||
|
|
# Suppress default logging for health checks
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def init_web3() -> bool:
|
||
|
|
"""Initialize Web3 connection"""
|
||
|
|
global w3
|
||
|
|
|
||
|
|
try:
|
||
|
|
logger.info(f"Connecting to RPC: {RPC_URL}")
|
||
|
|
w3 = Web3(Web3.HTTPProvider(RPC_URL, request_kwargs={'timeout': 30}))
|
||
|
|
|
||
|
|
if not w3.is_connected():
|
||
|
|
logger.error(f"Failed to connect to RPC: {RPC_URL}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
chain_id = w3.eth.chain_id
|
||
|
|
block_number = w3.eth.block_number
|
||
|
|
logger.info(f"Connected to chain {chain_id}, current block: {block_number}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error initializing Web3: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def monitor_ccip_events():
|
||
|
|
"""Monitor CCIP Router events"""
|
||
|
|
global w3, last_processed_block
|
||
|
|
|
||
|
|
if not w3 or not w3.is_connected():
|
||
|
|
logger.warning("Web3 not connected, skipping event monitoring")
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get current block
|
||
|
|
current_block = w3.eth.block_number
|
||
|
|
ccip_last_block.set(current_block)
|
||
|
|
|
||
|
|
if CCIP_ROUTER_ADDRESS and CCIP_ROUTER_ADDRESS != '':
|
||
|
|
try:
|
||
|
|
# Determine block range (check last 100 blocks or since last processed)
|
||
|
|
from_block = max(last_processed_block + 1 if last_processed_block > 0 else current_block - 100, current_block - 1000)
|
||
|
|
to_block = current_block
|
||
|
|
|
||
|
|
if from_block <= to_block:
|
||
|
|
logger.debug(f"Checking blocks {from_block} to {to_block} for CCIP events")
|
||
|
|
|
||
|
|
# Get MessageSent events using raw get_logs (web3.py 7.x compatible)
|
||
|
|
try:
|
||
|
|
# Calculate event signature hash for MessageSent
|
||
|
|
# MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes)
|
||
|
|
message_sent_signature = "MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes)"
|
||
|
|
message_sent_topic = Web3.keccak(text=message_sent_signature)
|
||
|
|
|
||
|
|
# Use get_logs with proper parameters for web3.py 7.x
|
||
|
|
filter_params = {
|
||
|
|
"fromBlock": from_block,
|
||
|
|
"toBlock": to_block,
|
||
|
|
"address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS),
|
||
|
|
"topics": [message_sent_topic.hex()] if hasattr(message_sent_topic, 'hex') else [message_sent_topic]
|
||
|
|
}
|
||
|
|
logs = w3.eth.get_logs(filter_params)
|
||
|
|
|
||
|
|
for log in logs:
|
||
|
|
# Handle transaction hash extraction safely
|
||
|
|
tx_hash = log.get('transactionHash')
|
||
|
|
if tx_hash:
|
||
|
|
if isinstance(tx_hash, bytes):
|
||
|
|
tx_hash_str = tx_hash.hex()
|
||
|
|
elif hasattr(tx_hash, 'hex'):
|
||
|
|
tx_hash_str = tx_hash.hex()
|
||
|
|
else:
|
||
|
|
tx_hash_str = str(tx_hash)
|
||
|
|
logger.info(f"CCIP MessageSent event detected: {tx_hash_str}")
|
||
|
|
ccip_messages_total.labels(event='MessageSent').inc()
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"No MessageSent events or error: {e}")
|
||
|
|
|
||
|
|
# Get MessageExecuted events using raw get_logs
|
||
|
|
try:
|
||
|
|
# MessageExecuted(bytes32,uint64,address,bytes)
|
||
|
|
message_executed_signature = "MessageExecuted(bytes32,uint64,address,bytes)"
|
||
|
|
message_executed_topic = Web3.keccak(text=message_executed_signature)
|
||
|
|
|
||
|
|
filter_params = {
|
||
|
|
"fromBlock": from_block,
|
||
|
|
"toBlock": to_block,
|
||
|
|
"address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS),
|
||
|
|
"topics": [message_executed_topic.hex()] if hasattr(message_executed_topic, 'hex') else [message_executed_topic]
|
||
|
|
}
|
||
|
|
logs = w3.eth.get_logs(filter_params)
|
||
|
|
|
||
|
|
for log in logs:
|
||
|
|
# Handle transaction hash extraction safely
|
||
|
|
tx_hash = log.get('transactionHash')
|
||
|
|
if tx_hash:
|
||
|
|
if isinstance(tx_hash, bytes):
|
||
|
|
tx_hash_str = tx_hash.hex()
|
||
|
|
elif hasattr(tx_hash, 'hex'):
|
||
|
|
tx_hash_str = tx_hash.hex()
|
||
|
|
else:
|
||
|
|
tx_hash_str = str(tx_hash)
|
||
|
|
logger.info(f"CCIP MessageExecuted event detected: {tx_hash_str}")
|
||
|
|
ccip_messages_total.labels(event='MessageExecuted').inc()
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"No MessageExecuted events or error: {e}")
|
||
|
|
|
||
|
|
last_processed_block = to_block
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error monitoring CCIP events: {e}")
|
||
|
|
else:
|
||
|
|
logger.warning("CCIP_ROUTER_ADDRESS not configured, skipping event monitoring")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error in monitor_ccip_events: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
def start_health_server():
|
||
|
|
"""Start HTTP server for health checks"""
|
||
|
|
try:
|
||
|
|
server = HTTPServer(('0.0.0.0', METRICS_PORT), HealthCheckHandler)
|
||
|
|
logger.info(f"Health check server started on port {METRICS_PORT}")
|
||
|
|
server.serve_forever()
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error starting health server: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Main function"""
|
||
|
|
logger.info("Starting CCIP Monitor Service")
|
||
|
|
logger.info(f"RPC URL: {RPC_URL}")
|
||
|
|
logger.info(f"CCIP Router: {CCIP_ROUTER_ADDRESS}")
|
||
|
|
logger.info(f"CCIP Sender: {CCIP_SENDER_ADDRESS}")
|
||
|
|
logger.info(f"Metrics Port: {METRICS_PORT}")
|
||
|
|
logger.info(f"Check Interval: {CHECK_INTERVAL} seconds")
|
||
|
|
|
||
|
|
# Initialize Web3
|
||
|
|
if not init_web3():
|
||
|
|
logger.error("Failed to initialize Web3, exiting")
|
||
|
|
exit(1)
|
||
|
|
|
||
|
|
# Start Prometheus metrics server
|
||
|
|
try:
|
||
|
|
start_http_server(METRICS_PORT + 1)
|
||
|
|
logger.info(f"Prometheus metrics server started on port {METRICS_PORT + 1}")
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Could not start Prometheus metrics server: {e}")
|
||
|
|
|
||
|
|
# Start health check server in separate thread
|
||
|
|
health_thread = Thread(target=start_health_server, daemon=True)
|
||
|
|
health_thread.start()
|
||
|
|
|
||
|
|
# Main monitoring loop
|
||
|
|
logger.info("Starting monitoring loop")
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
# Check Web3 connection
|
||
|
|
if not w3.is_connected():
|
||
|
|
logger.warning("Web3 connection lost, attempting to reconnect...")
|
||
|
|
if not init_web3():
|
||
|
|
logger.error("Failed to reconnect to RPC")
|
||
|
|
ccip_rpc_connected.set(0)
|
||
|
|
time.sleep(30)
|
||
|
|
continue
|
||
|
|
|
||
|
|
ccip_rpc_connected.set(1)
|
||
|
|
|
||
|
|
# Monitor CCIP events
|
||
|
|
monitor_ccip_events()
|
||
|
|
|
||
|
|
# Sleep until next check
|
||
|
|
time.sleep(CHECK_INTERVAL)
|
||
|
|
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
logger.info("Received interrupt signal, shutting down...")
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error in main loop: {e}", exc_info=True)
|
||
|
|
time.sleep(30)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
main()
|
||
|
|
|