Files
proxmox/scripts/test-all-rpc-nodes.py

484 lines
19 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Thorough RPC node testing for ChainID 138 (Besu RPC nodes).
What it does:
- Probes all known RPC nodes (VMIDs 2400-2402, 2500-2508) on HTTP port 8545
- Attempts multiple Host headers to work around Besu rpc-http-host-allowlist
- Runs a JSON-RPC method matrix and records latency + key health signals
- Writes a timestamped Markdown + JSON report under ./reports/
Usage:
python3 scripts/test-all-rpc-nodes.py
python3 scripts/test-all-rpc-nodes.py --port 8545 --timeout 4 --threads 8
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import URLError, HTTPError
from urllib.request import Request, urlopen
CHAIN_ID_EXPECTED_HEX = "0x8a"
NET_VERSION_EXPECTED = "138"
RPC_NODES: List[Dict[str, str]] = [
# ThirdWeb RPC nodes
{"vmid": "2401", "ip": "192.168.11.241", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-1"},
{"vmid": "2402", "ip": "192.168.11.242", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-2"},
{"vmid": "2403", "ip": "192.168.11.243", "group": "thirdweb", "name": "besu-rpc-thirdweb-0x8a-3"},
# Core/Public/Private RPC nodes
{"vmid": "2101", "ip": "192.168.11.211", "group": "core", "name": "besu-rpc-core-1"},
{"vmid": "2201", "ip": "192.168.11.221", "group": "public", "name": "besu-rpc-public-1"},
{"vmid": "2301", "ip": "192.168.11.232", "group": "private", "name": "besu-rpc-private-1"},
# Tenant RPC nodes
{"vmid": "2303", "ip": "192.168.11.233", "group": "tenant", "name": "besu-rpc-ali-0x8a"},
{"vmid": "2304", "ip": "192.168.11.234", "group": "tenant", "name": "besu-rpc-ali-0x1"},
{"vmid": "2305", "ip": "192.168.11.235", "group": "tenant", "name": "besu-rpc-luis-0x8a"},
{"vmid": "2306", "ip": "192.168.11.236", "group": "tenant", "name": "besu-rpc-luis-0x1"},
{"vmid": "2307", "ip": "192.168.11.237", "group": "tenant", "name": "besu-rpc-putu-0x8a"},
{"vmid": "2308", "ip": "192.168.11.238", "group": "tenant", "name": "besu-rpc-putu-0x1"},
]
DEFAULT_HOST_HEADER_CANDIDATES = [
# Most permissive/common
"localhost",
"127.0.0.1",
# Known public domains from docs/scripts (helpful if allowlist is domain-based)
"rpc-http-pub.d-bis.org",
"rpc.d-bis.org",
"rpc2.d-bis.org",
"rpc.public-0138.defi-oracle.io",
]
@dataclass
class RpcCallResult:
ok: bool
latency_ms: Optional[float]
response: Optional[Dict[str, Any]]
error: Optional[str]
def _http_post_json(url: str, payload: Dict[str, Any], headers: Dict[str, str], timeout: float) -> RpcCallResult:
data = json.dumps(payload).encode("utf-8")
req = Request(url, data=data, method="POST")
for k, v in headers.items():
req.add_header(k, v)
req.add_header("Content-Type", "application/json")
start = time.perf_counter()
try:
with urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
latency_ms = (time.perf_counter() - start) * 1000.0
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"invalid_json: {raw[:200]}")
return RpcCallResult(ok=True, latency_ms=latency_ms, response=parsed, error=None)
except HTTPError as e:
latency_ms = (time.perf_counter() - start) * 1000.0
try:
raw = e.read().decode("utf-8", errors="replace")
except Exception:
raw = ""
return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"http_error:{e.code}:{raw[:200]}")
except URLError as e:
latency_ms = (time.perf_counter() - start) * 1000.0
return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"url_error:{e.reason}")
except Exception as e:
latency_ms = (time.perf_counter() - start) * 1000.0
return RpcCallResult(ok=False, latency_ms=latency_ms, response=None, error=f"exception:{type(e).__name__}:{e}")
def rpc_call(
url: str,
method: str,
params: Optional[List[Any]] = None,
host_header: Optional[str] = None,
timeout: float = 4.0,
request_id: int = 1,
) -> RpcCallResult:
headers: Dict[str, str] = {}
if host_header:
headers["Host"] = host_header
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params if params is not None else [],
"id": request_id,
}
return _http_post_json(url, payload, headers=headers, timeout=timeout)
def extract_jsonrpc_error_message(resp: Dict[str, Any]) -> Optional[str]:
if not isinstance(resp, dict):
return None
err = resp.get("error")
if isinstance(err, dict):
msg = err.get("message")
if isinstance(msg, str):
return msg
# Some proxies respond differently
msg = resp.get("message")
if isinstance(msg, str):
return msg
return None
def hex_to_int(hex_str: str) -> Optional[int]:
if not isinstance(hex_str, str):
return None
if not hex_str.startswith("0x"):
return None
try:
return int(hex_str, 16)
except Exception:
return None
def choose_working_host_header(
base_url: str, timeout: float, candidates: List[Optional[str]]
) -> Tuple[Optional[str], RpcCallResult]:
"""
Try eth_chainId with different Host headers. Return first that doesn't look like host-allowlist denial.
candidates may include None meaning "no explicit Host header override".
"""
last = RpcCallResult(ok=False, latency_ms=None, response=None, error="not_tested")
for host in candidates:
res = rpc_call(base_url, "eth_chainId", params=[], host_header=host, timeout=timeout, request_id=1)
last = res
if not res.ok or not res.response:
continue
err_msg = extract_jsonrpc_error_message(res.response)
if err_msg and "Host not authorized" in err_msg:
continue
# If result exists, we consider it a success path
if isinstance(res.response, dict) and "result" in res.response:
return host, res
# If no explicit host denial, keep it as acceptable (some clients respond errors for other reasons)
if not err_msg:
return host, res
return None, last
def test_node(
node: Dict[str, str],
port: int,
timeout: float,
host_header_candidates: List[str],
) -> Dict[str, Any]:
ip = node["ip"]
base_url = f"http://{ip}:{port}"
candidates: List[Optional[str]] = [None] + host_header_candidates
chosen_host, probe = choose_working_host_header(base_url, timeout=timeout, candidates=candidates)
result: Dict[str, Any] = {
"vmid": node["vmid"],
"ip": ip,
"name": node["name"],
"group": node["group"],
"url": base_url,
"host_header_used": chosen_host,
"reachable": False,
"authorized": False,
"probe": {
"ok": probe.ok,
"latency_ms": probe.latency_ms,
"error": probe.error,
"response": probe.response,
},
"checks": {},
"timings_ms": {},
}
# If we couldn't even POST successfully, we stop here.
if not probe.ok or not probe.response:
return result
result["reachable"] = True
probe_err = extract_jsonrpc_error_message(probe.response) if isinstance(probe.response, dict) else None
if probe_err and "Host not authorized" in probe_err:
result["authorized"] = False
return result
result["authorized"] = True
method_errors: Dict[str, str] = {}
def call(method: str, params: Optional[List[Any]] = None, rid: int = 1) -> RpcCallResult:
return rpc_call(base_url, method, params=params, host_header=chosen_host, timeout=timeout, request_id=rid)
def get_result(res: RpcCallResult) -> Tuple[Any, Optional[str]]:
"""
Returns (result_value, error_string). error_string includes both transport and JSON-RPC errors.
"""
if not res.ok:
return None, res.error or "request_failed"
if not isinstance(res.response, dict):
return None, "invalid_response"
if "error" in res.response:
msg = extract_jsonrpc_error_message(res.response) or "jsonrpc_error"
return None, msg
return res.response.get("result"), None
# 1) Chain ID
chain = call("eth_chainId", [], 2)
result["timings_ms"]["eth_chainId"] = chain.latency_ms
chain_id, chain_err = get_result(chain)
if chain_err:
method_errors["eth_chainId"] = chain_err
result["checks"]["eth_chainId"] = chain_id
result["checks"]["eth_chainId_ok"] = chain_id == CHAIN_ID_EXPECTED_HEX
# 2) net_version
netv = call("net_version", [], 3)
result["timings_ms"]["net_version"] = netv.latency_ms
net_version, netv_err = get_result(netv)
if netv_err:
method_errors["net_version"] = netv_err
result["checks"]["net_version"] = net_version
result["checks"]["net_version_ok"] = net_version == NET_VERSION_EXPECTED
# 3) web3_clientVersion
cv = call("web3_clientVersion", [], 4)
result["timings_ms"]["web3_clientVersion"] = cv.latency_ms
client_version, cv_err = get_result(cv)
if cv_err:
method_errors["web3_clientVersion"] = cv_err
result["checks"]["client_version"] = client_version
# 4) Block number
bn = call("eth_blockNumber", [], 5)
result["timings_ms"]["eth_blockNumber"] = bn.latency_ms
bn_hex, bn_err = get_result(bn)
if bn_err:
method_errors["eth_blockNumber"] = bn_err
bn_int = hex_to_int(bn_hex) if isinstance(bn_hex, str) else None
result["checks"]["block_number_hex"] = bn_hex
result["checks"]["block_number"] = bn_int
# 5) Syncing
syncing = call("eth_syncing", [], 6)
result["timings_ms"]["eth_syncing"] = syncing.latency_ms
sync_val, sync_err = get_result(syncing)
if sync_err:
method_errors["eth_syncing"] = sync_err
# Besu returns false or an object
result["checks"]["syncing"] = sync_val
result["checks"]["syncing_ok"] = (sync_val is False)
# 6) Peer count
peers = call("net_peerCount", [], 7)
result["timings_ms"]["net_peerCount"] = peers.latency_ms
peer_hex, peers_err = get_result(peers)
if peers_err:
method_errors["net_peerCount"] = peers_err
peer_int = hex_to_int(peer_hex) if isinstance(peer_hex, str) else None
result["checks"]["peer_count_hex"] = peer_hex
result["checks"]["peer_count"] = peer_int
# 7) Latest block header sanity (optional)
latest = call("eth_getBlockByNumber", ["latest", False], 8)
result["timings_ms"]["eth_getBlockByNumber_latest"] = latest.latency_ms
blk, latest_err = get_result(latest)
if latest_err:
method_errors["eth_getBlockByNumber"] = latest_err
if isinstance(blk, dict):
result["checks"]["latest_block_hash"] = blk.get("hash")
result["checks"]["latest_block_timestamp_hex"] = blk.get("timestamp")
# 8) Gas price
gp = call("eth_gasPrice", [], 9)
result["timings_ms"]["eth_gasPrice"] = gp.latency_ms
gp_hex, gp_err = get_result(gp)
if gp_err:
method_errors["eth_gasPrice"] = gp_err
result["checks"]["gas_price_hex"] = gp_hex
result["checks"]["gas_price_wei"] = hex_to_int(gp_hex) if isinstance(gp_hex, str) else None
# 9) txpool_status (may be disabled)
txp = call("txpool_status", [], 10)
result["timings_ms"]["txpool_status"] = txp.latency_ms
txp_val, txp_err = get_result(txp)
if txp_err:
method_errors["txpool_status"] = txp_err
if txp_val is not None:
result["checks"]["txpool_status"] = txp_val
result["checks"]["txpool_supported"] = True
else:
result["checks"]["txpool_supported"] = False
if method_errors:
result["checks"]["method_errors"] = method_errors
# Aggregate latency
# Avg over successful calls only (exclude timeouts/errors)
latencies: List[float] = []
for method, v in result["timings_ms"].items():
if isinstance(v, (int, float)) and method not in method_errors:
latencies.append(float(v))
result["checks"]["avg_latency_ms"] = (sum(latencies) / len(latencies)) if latencies else None
return result
def ensure_reports_dir(project_root: str) -> str:
reports_dir = os.path.join(project_root, "reports")
os.makedirs(reports_dir, exist_ok=True)
return reports_dir
def render_markdown_report(summary: Dict[str, Any], nodes: List[Dict[str, Any]]) -> str:
ts = summary["generated_at"]
ok = summary["authorized_ok_count"]
total = summary["total_nodes"]
reachable = summary["reachable_count"]
lines: List[str] = []
lines.append(f"# RPC Nodes Test Report (ChainID 138)\n")
lines.append(f"- Generated: **{ts}**\n")
lines.append(f"- Nodes: **{total}** (reachable: **{reachable}**, authorized+responding: **{ok}**)\n")
lines.append("")
lines.append("## Summary\n")
lines.append("")
lines.append("| VMID | Name | IP | Reachable | Authorized | ChainId | NetVersion | Block | Peers | Syncing | Avg Latency (ms) | Host Header Used |")
lines.append("|------|------|----|-----------|------------|---------|------------|-------|-------|---------|------------------|------------------|")
for n in nodes:
chk = n.get("checks", {})
chain = chk.get("eth_chainId")
netv = chk.get("net_version")
block = chk.get("block_number")
peers = chk.get("peer_count")
syncing_ok = chk.get("syncing_ok")
avg_lat = chk.get("avg_latency_ms")
lines.append(
"| {vmid} | {name} | {ip} | {reachable} | {authorized} | {chain} | {netv} | {block} | {peers} | {sync} | {lat} | {host} |".format(
vmid=n.get("vmid"),
name=n.get("name"),
ip=n.get("ip"),
reachable="" if n.get("reachable") else "",
authorized="" if n.get("authorized") else "",
chain=chain if chain is not None else "-",
netv=netv if netv is not None else "-",
block=block if block is not None else "-",
peers=peers if peers is not None else "-",
sync="" if syncing_ok is True else ("⚠️" if syncing_ok is False else "-"),
lat=f"{avg_lat:.1f}" if isinstance(avg_lat, (int, float)) else "-",
host=n.get("host_header_used") or "-",
)
)
lines.append("")
lines.append("## Cluster Consistency\n")
lines.append("")
lines.append(f"- Block range (authorized nodes): **{summary.get('min_block')}** → **{summary.get('max_block')}** (Δ **{summary.get('block_spread')}**)\n")
lines.append(f"- Expected chainId: **{CHAIN_ID_EXPECTED_HEX}**; nodes matching: **{summary.get('chainid_match_count')}**\n")
lines.append(f"- Expected net_version: **{NET_VERSION_EXPECTED}**; nodes matching: **{summary.get('netversion_match_count')}**\n")
lines.append("")
lines.append("## Notes\n")
lines.append("")
lines.append("- If a node is **reachable but not authorized**, it likely has `rpc-http-host-allowlist` restrictions. This report attempts common Host headers (`localhost`, known RPC domains) to work around that.\n")
lines.append("- If a node is **not reachable**, its either stopped, firewalled, or the network path from this runner to `192.168.11.0/24` is down.\n")
lines.append("")
return "\n".join(lines)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--port", type=int, default=8545)
ap.add_argument("--timeout", type=float, default=4.0)
ap.add_argument("--threads", type=int, default=8)
ap.add_argument(
"--host-header",
action="append",
default=[],
help="Additional Host header candidate (repeatable).",
)
args = ap.parse_args()
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
reports_dir = ensure_reports_dir(project_root)
host_candidates = DEFAULT_HOST_HEADER_CANDIDATES + list(args.host_header or [])
now_utc = dt.datetime.now(dt.timezone.utc).replace(microsecond=0)
generated_at = now_utc.isoformat().replace("+00:00", "Z")
stamp = now_utc.strftime("%Y%m%d_%H%M%S")
results: List[Dict[str, Any]] = []
with ThreadPoolExecutor(max_workers=max(1, args.threads)) as ex:
futs = [
ex.submit(test_node, node, args.port, args.timeout, host_candidates)
for node in RPC_NODES
]
for f in as_completed(futs):
results.append(f.result())
# Stable ordering for output
results.sort(key=lambda r: int(r.get("vmid", "0")))
authorized_nodes = [r for r in results if r.get("authorized")]
blocks = [r.get("checks", {}).get("block_number") for r in authorized_nodes]
blocks_int = [b for b in blocks if isinstance(b, int)]
min_block = min(blocks_int) if blocks_int else None
max_block = max(blocks_int) if blocks_int else None
block_spread = (max_block - min_block) if (min_block is not None and max_block is not None) else None
summary = {
"generated_at": generated_at,
"total_nodes": len(results),
"reachable_count": sum(1 for r in results if r.get("reachable")),
"authorized_ok_count": sum(1 for r in results if r.get("authorized")),
"chainid_match_count": sum(1 for r in authorized_nodes if r.get("checks", {}).get("eth_chainId") == CHAIN_ID_EXPECTED_HEX),
"netversion_match_count": sum(1 for r in authorized_nodes if r.get("checks", {}).get("net_version") == NET_VERSION_EXPECTED),
"min_block": min_block,
"max_block": max_block,
"block_spread": block_spread,
"port": args.port,
"timeout_s": args.timeout,
"threads": args.threads,
"host_header_candidates": host_candidates,
}
out_json = os.path.join(reports_dir, f"rpc_nodes_test_{stamp}.json")
out_md = os.path.join(reports_dir, f"rpc_nodes_test_{stamp}.md")
with open(out_json, "w", encoding="utf-8") as f:
json.dump({"summary": summary, "nodes": results}, f, indent=2, sort_keys=False)
md = render_markdown_report(summary, results)
with open(out_md, "w", encoding="utf-8") as f:
f.write(md)
print(f"Wrote JSON report: {out_json}")
print(f"Wrote Markdown report: {out_md}")
print("")
print(f"Reachable: {summary['reachable_count']}/{summary['total_nodes']}")
print(f"Authorized+responding: {summary['authorized_ok_count']}/{summary['total_nodes']}")
if block_spread is not None:
print(f"Block spread: {block_spread} (min={min_block}, max={max_block})")
return 0
if __name__ == "__main__":
raise SystemExit(main())