#!/usr/bin/env python3 """ Thorough RPC node testing for ChainID 138 (Besu RPC nodes). What it does: - Probes all known RPC nodes (VMIDs 2400-2402, 2500-2508) on HTTP port 8545 - Attempts multiple Host headers to work around Besu rpc-http-host-allowlist - Runs a JSON-RPC method matrix and records latency + key health signals - Writes a timestamped Markdown + JSON report under ./reports/ Usage: python3 scripts/test-all-rpc-nodes.py python3 scripts/test-all-rpc-nodes.py --port 8545 --timeout 4 --threads 8 """ from __future__ import annotations import argparse import datetime as dt import json import os import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from urllib.error import URLError, HTTPError from urllib.request import Request, urlopen CHAIN_ID_EXPECTED_HEX = "0x8a" NET_VERSION_EXPECTED = "138" RPC_NODES: List[Dict[str, str]] = [ # ThirdWeb RPC nodes {"vmid": "2400", "ip": "192.168.11.240", "group": "thirdweb", "name": "thirdweb-rpc-1"}, {"vmid": "2401", "ip": "192.168.11.241", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-1"}, {"vmid": "2402", "ip": "192.168.11.242", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-2"}, {"vmid": "2403", "ip": "192.168.11.243", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-3"}, # Core/Public/Private RPC nodes {"vmid": "2101", "ip": "192.168.11.211", "group": "core", "name": "besu-rpc-core-1"}, {"vmid": "2201", "ip": "192.168.11.221", "group": "public", "name": "besu-rpc-public-1"}, {"vmid": "2301", "ip": "192.168.11.232", "group": "private", "name": "besu-rpc-private-1"}, # Tenant RPC nodes {"vmid": "2303", "ip": "192.168.11.233", "group": "tenant", "name": "besu-rpc-ali-0x8a"}, {"vmid": "2304", "ip": "192.168.11.234", "group": "tenant", "name": "besu-rpc-ali-0x1"}, {"vmid": "2305", "ip": "192.168.11.235", "group": "tenant", "name": "besu-rpc-luis-0x8a"}, {"vmid": "2306", "ip": "192.168.11.236", "group": "tenant", "name": "besu-rpc-luis-0x1"}, {"vmid": "2307", "ip": "192.168.11.237", "group": "tenant", "name": "besu-rpc-putu-0x8a"}, {"vmid": "2308", "ip": "192.168.11.238", "group": "tenant", "name": "besu-rpc-putu-0x1"}, ] DEFAULT_HOST_HEADER_CANDIDATES = [ # Most permissive/common "localhost", "127.0.0.1", # Known public domains from docs/scripts (helpful if allowlist is domain-based) "rpc-http-pub.d-bis.org", "rpc.d-bis.org", "rpc2.d-bis.org", "rpc.public-0138.defi-oracle.io", ] @dataclass class RpcCallResult: ok: bool latency_ms: Optional[float] response: Optional[Dict[str, Any]] error: Optional[str] def _http_post_json(url: str, payload: Dict[str, Any], headers: Dict[str, str], timeout: float) -> RpcCallResult: data = json.dumps(payload).encode("utf-8") req = Request(url, data=data, method="POST") for k, v in headers.items(): req.add_header(k, v) req.add_header("Content-Type", "application/json") start = time.perf_counter() try: with urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8", errors="replace") latency_ms = (time.perf_counter() - start) * 1000.0 try: parsed = json.loads(raw) except json.JSONDecodeError: return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"invalid_json: {raw[:200]}") return RpcCallResult(ok=True, latency_ms=latency_ms, response=parsed, error=None) except HTTPError as e: latency_ms = (time.perf_counter() - start) * 1000.0 try: raw = e.read().decode("utf-8", errors="replace") except Exception: raw = "" return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"http_error:{e.code}:{raw[:200]}") except URLError as e: latency_ms = (time.perf_counter() - start) * 1000.0 return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"url_error:{e.reason}") except Exception as e: latency_ms = (time.perf_counter() - start) * 1000.0 return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"exception:{type(e).__name__}:{e}") def rpc_call( url: str, method: str, params: Optional[List[Any]] = None, host_header: Optional[str] = None, timeout: float = 4.0, request_id: int = 1, ) -> RpcCallResult: headers: Dict[str, str] = {} if host_header: headers["Host"] = host_header payload = { "jsonrpc": "2.0", "method": method, "params": params if params is not None else [], "id": request_id, } return _http_post_json(url, payload, headers=headers, timeout=timeout) def extract_jsonrpc_error_message(resp: Dict[str, Any]) -> Optional[str]: if not isinstance(resp, dict): return None err = resp.get("error") if isinstance(err, dict): msg = err.get("message") if isinstance(msg, str): return msg # Some proxies respond differently msg = resp.get("message") if isinstance(msg, str): return msg return None def hex_to_int(hex_str: str) -> Optional[int]: if not isinstance(hex_str, str): return None if not hex_str.startswith("0x"): return None try: return int(hex_str, 16) except Exception: return None def choose_working_host_header( base_url: str, timeout: float, candidates: List[Optional[str]] ) -> Tuple[Optional[str], RpcCallResult]: """ Try eth_chainId with different Host headers. Return first that doesn't look like host-allowlist denial. candidates may include None meaning "no explicit Host header override". """ last = RpcCallResult(ok=False, latency_ms=None, response=None, error="not_tested") for host in candidates: res = rpc_call(base_url, "eth_chainId", params=[], host_header=host, timeout=timeout, request_id=1) last = res if not res.ok or not res.response: continue err_msg = extract_jsonrpc_error_message(res.response) if err_msg and "Host not authorized" in err_msg: continue # If result exists, we consider it a success path if isinstance(res.response, dict) and "result" in res.response: return host, res # If no explicit host denial, keep it as acceptable (some clients respond errors for other reasons) if not err_msg: return host, res return None, last def test_node( node: Dict[str, str], port: int, timeout: float, host_header_candidates: List[str], ) -> Dict[str, Any]: ip = node["ip"] base_url = f"http://{ip}:{port}" candidates: List[Optional[str]] = [None] + host_header_candidates chosen_host, probe = choose_working_host_header(base_url, timeout=timeout, candidates=candidates) result: Dict[str, Any] = { "vmid": node["vmid"], "ip": ip, "name": node["name"], "group": node["group"], "url": base_url, "host_header_used": chosen_host, "reachable": False, "authorized": False, "probe": { "ok": probe.ok, "latency_ms": probe.latency_ms, "error": probe.error, "response": probe.response, }, "checks": {}, "timings_ms": {}, } # If we couldn't even POST successfully, we stop here. if not probe.ok or not probe.response: return result result["reachable"] = True probe_err = extract_jsonrpc_error_message(probe.response) if isinstance(probe.response, dict) else None if probe_err and "Host not authorized" in probe_err: result["authorized"] = False return result result["authorized"] = True method_errors: Dict[str, str] = {} def call(method: str, params: Optional[List[Any]] = None, rid: int = 1) -> RpcCallResult: return rpc_call(base_url, method, params=params, host_header=chosen_host, timeout=timeout, request_id=rid) def get_result(res: RpcCallResult) -> Tuple[Any, Optional[str]]: """ Returns (result_value, error_string). error_string includes both transport and JSON-RPC errors. """ if not res.ok: return None, res.error or "request_failed" if not isinstance(res.response, dict): return None, "invalid_response" if "error" in res.response: msg = extract_jsonrpc_error_message(res.response) or "jsonrpc_error" return None, msg return res.response.get("result"), None # 1) Chain ID chain = call("eth_chainId", [], 2) result["timings_ms"]["eth_chainId"] = chain.latency_ms chain_id, chain_err = get_result(chain) if chain_err: method_errors["eth_chainId"] = chain_err result["checks"]["eth_chainId"] = chain_id result["checks"]["eth_chainId_ok"] = chain_id == CHAIN_ID_EXPECTED_HEX # 2) net_version netv = call("net_version", [], 3) result["timings_ms"]["net_version"] = netv.latency_ms net_version, netv_err = get_result(netv) if netv_err: method_errors["net_version"] = netv_err result["checks"]["net_version"] = net_version result["checks"]["net_version_ok"] = net_version == NET_VERSION_EXPECTED # 3) web3_clientVersion cv = call("web3_clientVersion", [], 4) result["timings_ms"]["web3_clientVersion"] = cv.latency_ms client_version, cv_err = get_result(cv) if cv_err: method_errors["web3_clientVersion"] = cv_err result["checks"]["client_version"] = client_version # 4) Block number bn = call("eth_blockNumber", [], 5) result["timings_ms"]["eth_blockNumber"] = bn.latency_ms bn_hex, bn_err = get_result(bn) if bn_err: method_errors["eth_blockNumber"] = bn_err bn_int = hex_to_int(bn_hex) if isinstance(bn_hex, str) else None result["checks"]["block_number_hex"] = bn_hex result["checks"]["block_number"] = bn_int # 5) Syncing syncing = call("eth_syncing", [], 6) result["timings_ms"]["eth_syncing"] = syncing.latency_ms sync_val, sync_err = get_result(syncing) if sync_err: method_errors["eth_syncing"] = sync_err # Besu returns false or an object result["checks"]["syncing"] = sync_val result["checks"]["syncing_ok"] = (sync_val is False) # 6) Peer count peers = call("net_peerCount", [], 7) result["timings_ms"]["net_peerCount"] = peers.latency_ms peer_hex, peers_err = get_result(peers) if peers_err: method_errors["net_peerCount"] = peers_err peer_int = hex_to_int(peer_hex) if isinstance(peer_hex, str) else None result["checks"]["peer_count_hex"] = peer_hex result["checks"]["peer_count"] = peer_int # 7) Latest block header sanity (optional) latest = call("eth_getBlockByNumber", ["latest", False], 8) result["timings_ms"]["eth_getBlockByNumber_latest"] = latest.latency_ms blk, latest_err = get_result(latest) if latest_err: method_errors["eth_getBlockByNumber"] = latest_err if isinstance(blk, dict): result["checks"]["latest_block_hash"] = blk.get("hash") result["checks"]["latest_block_timestamp_hex"] = blk.get("timestamp") # 8) Gas price gp = call("eth_gasPrice", [], 9) result["timings_ms"]["eth_gasPrice"] = gp.latency_ms gp_hex, gp_err = get_result(gp) if gp_err: method_errors["eth_gasPrice"] = gp_err result["checks"]["gas_price_hex"] = gp_hex result["checks"]["gas_price_wei"] = hex_to_int(gp_hex) if isinstance(gp_hex, str) else None # 9) txpool_status (may be disabled) txp = call("txpool_status", [], 10) result["timings_ms"]["txpool_status"] = txp.latency_ms txp_val, txp_err = get_result(txp) if txp_err: method_errors["txpool_status"] = txp_err if txp_val is not None: result["checks"]["txpool_status"] = txp_val result["checks"]["txpool_supported"] = True else: result["checks"]["txpool_supported"] = False if method_errors: result["checks"]["method_errors"] = method_errors # Aggregate latency # Avg over successful calls only (exclude timeouts/errors) latencies: List[float] = [] for method, v in result["timings_ms"].items(): if isinstance(v, (int, float)) and method not in method_errors: latencies.append(float(v)) result["checks"]["avg_latency_ms"] = (sum(latencies) / len(latencies)) if latencies else None return result def ensure_reports_dir(project_root: str) -> str: reports_dir = os.path.join(project_root, "reports") os.makedirs(reports_dir, exist_ok=True) return reports_dir def render_markdown_report(summary: Dict[str, Any], nodes: List[Dict[str, Any]]) -> str: ts = summary["generated_at"] ok = summary["authorized_ok_count"] total = summary["total_nodes"] reachable = summary["reachable_count"] lines: List[str] = [] lines.append(f"# RPC Nodes Test Report (ChainID 138)\n") lines.append(f"- Generated: **{ts}**\n") lines.append(f"- Nodes: **{total}** (reachable: **{reachable}**, authorized+responding: **{ok}**)\n") lines.append("") lines.append("## Summary\n") lines.append("") lines.append("| VMID | Name | IP | Reachable | Authorized | ChainId | NetVersion | Block | Peers | Syncing | Avg Latency (ms) | Host Header Used |") lines.append("|------|------|----|-----------|------------|---------|------------|-------|-------|---------|------------------|------------------|") for n in nodes: chk = n.get("checks", {}) chain = chk.get("eth_chainId") netv = chk.get("net_version") block = chk.get("block_number") peers = chk.get("peer_count") syncing_ok = chk.get("syncing_ok") avg_lat = chk.get("avg_latency_ms") lines.append( "| {vmid} | {name} | {ip} | {reachable} | {authorized} | {chain} | {netv} | {block} | {peers} | {sync} | {lat} | {host} |".format( vmid=n.get("vmid"), name=n.get("name"), ip=n.get("ip"), reachable="✅" if n.get("reachable") else "❌", authorized="✅" if n.get("authorized") else "❌", chain=chain if chain is not None else "-", netv=netv if netv is not None else "-", block=block if block is not None else "-", peers=peers if peers is not None else "-", sync="✅" if syncing_ok is True else ("⚠️" if syncing_ok is False else "-"), lat=f"{avg_lat:.1f}" if isinstance(avg_lat, (int, float)) else "-", host=n.get("host_header_used") or "-", ) ) lines.append("") lines.append("## Cluster Consistency\n") lines.append("") lines.append(f"- Block range (authorized nodes): **{summary.get('min_block')}** → **{summary.get('max_block')}** (Δ **{summary.get('block_spread')}**)\n") lines.append(f"- Expected chainId: **{CHAIN_ID_EXPECTED_HEX}**; nodes matching: **{summary.get('chainid_match_count')}**\n") lines.append(f"- Expected net_version: **{NET_VERSION_EXPECTED}**; nodes matching: **{summary.get('netversion_match_count')}**\n") lines.append("") lines.append("## Notes\n") lines.append("") lines.append("- If a node is **reachable but not authorized**, it likely has `rpc-http-host-allowlist` restrictions. This report attempts common Host headers (`localhost`, known RPC domains) to work around that.\n") lines.append("- If a node is **not reachable**, it’s either stopped, firewalled, or the network path from this runner to `192.168.11.0/24` is down.\n") lines.append("") return "\n".join(lines) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--port", type=int, default=8545) ap.add_argument("--timeout", type=float, default=4.0) ap.add_argument("--threads", type=int, default=8) ap.add_argument( "--host-header", action="append", default=[], help="Additional Host header candidate (repeatable).", ) args = ap.parse_args() project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) reports_dir = ensure_reports_dir(project_root) host_candidates = DEFAULT_HOST_HEADER_CANDIDATES + list(args.host_header or []) now_utc = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) generated_at = now_utc.isoformat().replace("+00:00", "Z") stamp = now_utc.strftime("%Y%m%d_%H%M%S") results: List[Dict[str, Any]] = [] with ThreadPoolExecutor(max_workers=max(1, args.threads)) as ex: futs = [ ex.submit(test_node, node, args.port, args.timeout, host_candidates) for node in RPC_NODES ] for f in as_completed(futs): results.append(f.result()) # Stable ordering for output results.sort(key=lambda r: int(r.get("vmid", "0"))) authorized_nodes = [r for r in results if r.get("authorized")] blocks = [r.get("checks", {}).get("block_number") for r in authorized_nodes] blocks_int = [b for b in blocks if isinstance(b, int)] min_block = min(blocks_int) if blocks_int else None max_block = max(blocks_int) if blocks_int else None block_spread = (max_block - min_block) if (min_block is not None and max_block is not None) else None summary = { "generated_at": generated_at, "total_nodes": len(results), "reachable_count": sum(1 for r in results if r.get("reachable")), "authorized_ok_count": sum(1 for r in results if r.get("authorized")), "chainid_match_count": sum(1 for r in authorized_nodes if r.get("checks", {}).get("eth_chainId") == CHAIN_ID_EXPECTED_HEX), "netversion_match_count": sum(1 for r in authorized_nodes if r.get("checks", {}).get("net_version") == NET_VERSION_EXPECTED), "min_block": min_block, "max_block": max_block, "block_spread": block_spread, "port": args.port, "timeout_s": args.timeout, "threads": args.threads, "host_header_candidates": host_candidates, } out_json = os.path.join(reports_dir, f"rpc_nodes_test_{stamp}.json") out_md = os.path.join(reports_dir, f"rpc_nodes_test_{stamp}.md") with open(out_json, "w", encoding="utf-8") as f: json.dump({"summary": summary, "nodes": results}, f, indent=2, sort_keys=False) md = render_markdown_report(summary, results) with open(out_md, "w", encoding="utf-8") as f: f.write(md) print(f"Wrote JSON report: {out_json}") print(f"Wrote Markdown report: {out_md}") print("") print(f"Reachable: {summary['reachable_count']}/{summary['total_nodes']}") print(f"Authorized+responding: {summary['authorized_ok_count']}/{summary['total_nodes']}") if block_spread is not None: print(f"Block spread: {block_spread} (min={min_block}, max={max_block})") return 0 if __name__ == "__main__": raise SystemExit(main())