Files
explorer-monorepo/backend/api/rest/mission_control.go
Devin f4e235edc6 chore(ci): align Go to 1.23.x, add staticcheck/govulncheck/gitleaks gates
.github/workflows/ci.yml:
- Go version: 1.22 -> 1.23.4 (matches go.mod's 'go 1.23.0' declaration).
- Split into four jobs with explicit names:
    * test-backend: go vet + go build + go test
    * scan-backend: staticcheck + govulncheck (installed from pinned tags)
    * test-frontend: npm ci + eslint + tsc --noEmit + next build
    * gitleaks: full-history secret scan on every PR
- Branches triggered: master + main + develop (master is the repo
  default; the previous workflow only triggered on main/develop and
  would never have run on the repo's actual PRs).
- actions/checkout@v4, actions/setup-go@v5, actions/setup-node@v4.
- Concurrency group cancels stale runs on the same ref.
- Node and Go caches enabled for faster CI.

.gitleaks.toml (new):
- Extends gitleaks defaults.
- Custom rule 'explorer-legacy-db-password-L@ker' keeps the historical
  password pattern L@kers?\$?2010 wedged in the detection set even
  after rotation, so any re-introduction (via copy-paste from old
  branches, stale docs, etc.) fails CI.
- Allowlists docs/SECURITY.md and CHANGELOG.md where the string is
  cited in rotation context.

backend/staticcheck.conf (new):
- Enables the full SA* correctness set.
- Temporarily disables ST1000/1003/1005/1020/1021/1022, U1000, S1016,
  S1031. These are stylistic/cosmetic checks; the project has a long
  tail of pre-existing hits there that would bloat every PR. Each is
  commented so the disable can be reverted in a dedicated cleanup.

Legit correctness issues surfaced by staticcheck and fixed in this PR:
- backend/analytics/token_distribution.go: 'best-effort MV refresh'
  block no longer dereferences a shadowed 'err'; scope-tight 'if err :='
  used for the subsequent QueryRow.
- backend/api/rest/middleware.go: compressionMiddleware() was parsing
  Accept-Encoding and doing nothing with it. Now it's a literal
  pass-through with a TODO comment pointing at gorilla/handlers.
- backend/api/rest/mission_control.go: shadowed 'err' from
  json.Unmarshal was assigned to an ignored outer binding via
  fmt.Errorf; replaced with a scoped 'if uerr :=' that lets the RPC
  fallback run as intended.
- backend/indexer/traces/tracer.go: best-effort CREATE TABLE no longer
  discards the error implicitly.
- backend/indexer/track2/block_indexer.go: 'latestBlock - uint64(i) >= 0'
  was a tautology on uint64. Replaced with an explicit
  'if uint64(i) > latestBlock { break }' guard so operators running
  count=1000 against a shallow chain don't underflow.
- backend/tracing/tracer.go: introduces a local ctxKey type and two
  constants so WithValue calls stop tripping SA1029.

Verification:
- go build ./... clean.
- go vet ./... clean.
- go test ./... all existing tests PASS.
- staticcheck ./... clean except for the SA1029 hits in
  api/middleware/auth.go and api/track4/operator_scripts_test.go,
  which are resolved by PR #4 once it merges to master.

Advances completion criterion 4 (CI in good health).
2026-04-18 19:10:20 +00:00

533 lines
15 KiB
Go

package rest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
)
var (
hexAddrRe = regexp.MustCompile(`(?i)^0x[0-9a-f]{40}$`)
hexTxRe = regexp.MustCompile(`(?i)^0x[0-9a-f]{64}$`)
)
type liquidityCacheEntry struct {
body []byte
until time.Time
ctype string
}
var liquidityPoolsCache sync.Map // string -> liquidityCacheEntry
var missionControlMetrics struct {
liquidityCacheHits uint64
liquidityCacheMisses uint64
liquidityUpstreamFailure uint64
bridgeTraceRequests uint64
bridgeTraceFailures uint64
}
func tokenAggregationBase() string {
for _, k := range []string{"TOKEN_AGGREGATION_BASE_URL", "TOKEN_AGGREGATION_URL"} {
if u := strings.TrimSpace(os.Getenv(k)); u != "" {
return strings.TrimRight(u, "/")
}
}
return ""
}
func looksLikeGenericUpstreamErrorPayload(body []byte) bool {
if len(bytes.TrimSpace(body)) == 0 {
return false
}
var payload map[string]any
if err := json.Unmarshal(body, &payload); err != nil {
return false
}
errValue, ok := payload["error"].(string)
if !ok || strings.TrimSpace(errValue) == "" {
return false
}
if _, ok := payload["pools"]; ok {
return false
}
if _, ok := payload["tokens"]; ok {
return false
}
if _, ok := payload["data"]; ok {
return false
}
if _, ok := payload["chains"]; ok {
return false
}
if _, ok := payload["tree"]; ok {
return false
}
if _, ok := payload["quote"]; ok {
return false
}
if status, ok := payload["status"].(string); ok && strings.EqualFold(status, "healthy") {
return false
}
return true
}
func blockscoutInternalBase() string {
u := strings.TrimSpace(os.Getenv("BLOCKSCOUT_INTERNAL_URL"))
if u == "" {
u = "http://127.0.0.1:4000"
}
return strings.TrimRight(u, "/")
}
func missionControlChainID() string {
if s := strings.TrimSpace(os.Getenv("CHAIN_ID")); s != "" {
return s
}
return "138"
}
func rpcURL() string {
if s := strings.TrimSpace(os.Getenv("RPC_URL")); s != "" {
return s
}
return ""
}
// handleMissionControlLiquidityTokenPath serves GET .../mission-control/liquidity/token/{addr}/pools (cached proxy to token-aggregation).
func (s *Server) handleMissionControlLiquidityTokenPath(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeMethodNotAllowed(w)
return
}
rest := strings.TrimPrefix(r.URL.Path, "/api/v1/mission-control/liquidity/token/")
rest = strings.Trim(rest, "/")
parts := strings.Split(rest, "/")
if len(parts) < 2 || parts[1] != "pools" {
writeError(w, http.StatusNotFound, "not_found", "expected /liquidity/token/{address}/pools")
return
}
addr := strings.TrimSpace(parts[0])
if !hexAddrRe.MatchString(addr) {
writeError(w, http.StatusBadRequest, "bad_request", "invalid token address")
return
}
base := tokenAggregationBase()
if base == "" {
writeError(w, http.StatusServiceUnavailable, "service_unavailable", "TOKEN_AGGREGATION_BASE_URL not configured")
return
}
chain := missionControlChainID()
cacheKey := strings.ToLower(addr) + "|" + chain
bypassCache := r.URL.Query().Get("refresh") == "1" ||
r.URL.Query().Get("noCache") == "1" ||
strings.Contains(strings.ToLower(r.Header.Get("Cache-Control")), "no-cache") ||
strings.Contains(strings.ToLower(r.Header.Get("Cache-Control")), "no-store")
if ent, ok := liquidityPoolsCache.Load(cacheKey); ok && !bypassCache {
e := ent.(liquidityCacheEntry)
if time.Now().Before(e.until) {
atomic.AddUint64(&missionControlMetrics.liquidityCacheHits, 1)
w.Header().Set("X-Mission-Control-Cache", "hit")
if e.ctype != "" {
w.Header().Set("Content-Type", e.ctype)
} else {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(e.body)
return
}
}
atomic.AddUint64(&missionControlMetrics.liquidityCacheMisses, 1)
if bypassCache {
w.Header().Set("X-Mission-Control-Cache", "bypass")
} else {
w.Header().Set("X-Mission-Control-Cache", "miss")
}
up, err := url.Parse(base + "/api/v1/tokens/" + url.PathEscape(addr) + "/pools")
if err != nil {
writeInternalError(w, "bad upstream URL")
return
}
q := up.Query()
q.Set("chainId", chain)
up.RawQuery = q.Encode()
ctx, cancel := context.WithTimeout(r.Context(), 25*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, up.String(), nil)
if err != nil {
writeInternalError(w, err.Error())
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1)
log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_error=%v", strings.ToLower(addr), chain, err)
writeError(w, http.StatusBadGateway, "bad_gateway", err.Error())
return
}
defer resp.Body.Close()
body, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20))
if err != nil {
atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1)
log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss read_error=%v", strings.ToLower(addr), chain, err)
writeError(w, http.StatusBadGateway, "bad_gateway", "read upstream body failed")
return
}
ctype := resp.Header.Get("Content-Type")
if ctype == "" {
ctype = "application/json"
}
isGenericSuccessError := resp.StatusCode >= 200 && resp.StatusCode < 300 && looksLikeGenericUpstreamErrorPayload(body)
if isGenericSuccessError {
atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1)
log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_status=%d generic_error_envelope=true", strings.ToLower(addr), chain, resp.StatusCode)
w.Header().Set("Content-Type", ctype)
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write(body)
return
}
if resp.StatusCode == http.StatusOK {
liquidityPoolsCache.Store(cacheKey, liquidityCacheEntry{
body: body,
until: time.Now().Add(30 * time.Second),
ctype: ctype,
})
cacheMode := "miss"
if bypassCache {
cacheMode = "bypass-refresh"
}
log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=%s stored_ttl_sec=30", strings.ToLower(addr), chain, cacheMode)
} else {
atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1)
log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_status=%d", strings.ToLower(addr), chain, resp.StatusCode)
}
w.Header().Set("Content-Type", ctype)
w.WriteHeader(resp.StatusCode)
_, _ = w.Write(body)
}
var (
registryOnce sync.Once
registryAddrToKey map[string]string
registryLoadErr error
)
func firstReadableFile(paths []string) ([]byte, string, error) {
for _, p := range paths {
if strings.TrimSpace(p) == "" {
continue
}
b, err := os.ReadFile(p)
if err == nil && len(b) > 0 {
return b, p, nil
}
}
return nil, "", fmt.Errorf("no readable file found")
}
func loadAddressRegistry138() map[string]string {
registryOnce.Do(func() {
registryAddrToKey = make(map[string]string)
var masterPaths []string
if p := strings.TrimSpace(os.Getenv("SMART_CONTRACTS_MASTER_JSON")); p != "" {
masterPaths = append(masterPaths, p)
}
masterPaths = append(masterPaths,
"config/smart-contracts-master.json",
"../config/smart-contracts-master.json",
"../../config/smart-contracts-master.json",
)
raw, masterPath, _ := firstReadableFile(masterPaths)
if len(raw) == 0 {
registryLoadErr = fmt.Errorf("smart-contracts-master.json not found")
return
}
var root map[string]interface{}
if err := json.Unmarshal(raw, &root); err != nil {
registryLoadErr = err
return
}
chains, _ := root["chains"].(map[string]interface{})
c138, _ := chains["138"].(map[string]interface{})
contracts, _ := c138["contracts"].(map[string]interface{})
for k, v := range contracts {
s, ok := v.(string)
if !ok || !hexAddrRe.MatchString(s) {
continue
}
registryAddrToKey[strings.ToLower(s)] = k
}
var inventoryPaths []string
if p := strings.TrimSpace(os.Getenv("EXPLORER_ADDRESS_INVENTORY_FILE")); p != "" {
inventoryPaths = append(inventoryPaths, p)
}
if masterPath != "" {
inventoryPaths = append(inventoryPaths, filepath.Join(filepath.Dir(masterPath), "address-inventory.json"))
}
inventoryPaths = append(inventoryPaths,
"explorer-monorepo/config/address-inventory.json",
"config/address-inventory.json",
"../config/address-inventory.json",
"../../config/address-inventory.json",
)
inventoryRaw, _, invErr := firstReadableFile(inventoryPaths)
if invErr != nil || len(inventoryRaw) == 0 {
return
}
var inventoryRoot struct {
Inventory map[string]string `json:"inventory"`
}
if err := json.Unmarshal(inventoryRaw, &inventoryRoot); err != nil {
return
}
for k, v := range inventoryRoot.Inventory {
if !hexAddrRe.MatchString(v) {
continue
}
addr := strings.ToLower(v)
if _, exists := registryAddrToKey[addr]; exists {
continue
}
registryAddrToKey[addr] = k
}
})
return registryAddrToKey
}
func jsonStringField(m map[string]interface{}, keys ...string) string {
for _, k := range keys {
if v, ok := m[k].(string); ok && v != "" {
return v
}
}
return ""
}
func extractEthAddress(val interface{}) string {
switch t := val.(type) {
case string:
if hexAddrRe.MatchString(strings.TrimSpace(t)) {
return strings.ToLower(strings.TrimSpace(t))
}
case map[string]interface{}:
if h := jsonStringField(t, "hash", "address"); h != "" && hexAddrRe.MatchString(h) {
return strings.ToLower(h)
}
}
return ""
}
func fetchBlockscoutTransaction(ctx context.Context, tx string) ([]byte, int, error) {
fetchURL := blockscoutInternalBase() + "/api/v2/transactions/" + url.PathEscape(tx)
timeouts := []time.Duration{15 * time.Second, 25 * time.Second}
var lastBody []byte
var lastStatus int
var lastErr error
for idx, timeout := range timeouts {
attemptCtx, cancel := context.WithTimeout(ctx, timeout)
req, err := http.NewRequestWithContext(attemptCtx, http.MethodGet, fetchURL, nil)
if err != nil {
cancel()
return nil, 0, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
cancel()
lastErr = err
if idx == len(timeouts)-1 {
return nil, 0, err
}
continue
}
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
resp.Body.Close()
cancel()
if readErr != nil {
lastErr = readErr
if idx == len(timeouts)-1 {
return nil, 0, readErr
}
continue
}
lastBody = body
lastStatus = resp.StatusCode
if resp.StatusCode == http.StatusOK {
return body, resp.StatusCode, nil
}
if resp.StatusCode < 500 || idx == len(timeouts)-1 {
return body, resp.StatusCode, nil
}
}
return lastBody, lastStatus, lastErr
}
func fetchTransactionViaRPC(ctx context.Context, tx string) (string, string, error) {
base := rpcURL()
if base == "" {
return "", "", fmt.Errorf("RPC_URL not configured")
}
payload, err := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"method": "eth_getTransactionByHash",
"params": []interface{}{tx},
})
if err != nil {
return "", "", err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, base, bytes.NewReader(payload))
if err != nil {
return "", "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if err != nil {
return "", "", err
}
if resp.StatusCode != http.StatusOK {
return "", "", fmt.Errorf("rpc HTTP %d", resp.StatusCode)
}
var rpcResp struct {
Result map[string]interface{} `json:"result"`
Error map[string]interface{} `json:"error"`
}
if err := json.Unmarshal(body, &rpcResp); err != nil {
return "", "", err
}
if rpcResp.Error != nil {
return "", "", fmt.Errorf("rpc error")
}
if rpcResp.Result == nil {
return "", "", fmt.Errorf("transaction not found")
}
fromAddr := extractEthAddress(jsonStringField(rpcResp.Result, "from"))
toAddr := extractEthAddress(jsonStringField(rpcResp.Result, "to"))
if fromAddr == "" && toAddr == "" {
return "", "", fmt.Errorf("transaction missing from/to")
}
return fromAddr, toAddr, nil
}
// HandleMissionControlBridgeTrace handles GET /api/v1/mission-control/bridge/trace?tx=0x...
func (s *Server) HandleMissionControlBridgeTrace(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&missionControlMetrics.bridgeTraceRequests, 1)
if r.Method != http.MethodGet {
writeMethodNotAllowed(w)
return
}
tx := strings.TrimSpace(r.URL.Query().Get("tx"))
if tx == "" {
writeError(w, http.StatusBadRequest, "bad_request", "missing tx query parameter")
return
}
if !hexTxRe.MatchString(tx) {
writeError(w, http.StatusBadRequest, "bad_request", "invalid transaction hash")
return
}
reg := loadAddressRegistry138()
publicBase := strings.TrimRight(strings.TrimSpace(os.Getenv("EXPLORER_PUBLIC_BASE")), "/")
if publicBase == "" {
publicBase = "https://explorer.d-bis.org"
}
fromAddr := ""
toAddr := ""
fromLabel := ""
toLabel := ""
source := "blockscout"
body, statusCode, err := fetchBlockscoutTransaction(r.Context(), tx)
if err == nil && statusCode == http.StatusOK {
var txDoc map[string]interface{}
if uerr := json.Unmarshal(body, &txDoc); uerr != nil {
// Fall through to the RPC fallback below. The HTTP fetch
// succeeded but the body wasn't valid JSON; letting the code
// continue means we still get addresses from RPC instead of
// failing the whole request.
_ = uerr
} else {
fromAddr = extractEthAddress(txDoc["from"])
toAddr = extractEthAddress(txDoc["to"])
}
}
if fromAddr == "" && toAddr == "" {
rpcFrom, rpcTo, rpcErr := fetchTransactionViaRPC(r.Context(), tx)
if rpcErr == nil {
fromAddr = rpcFrom
toAddr = rpcTo
source = "rpc_fallback"
} else {
atomic.AddUint64(&missionControlMetrics.bridgeTraceFailures, 1)
if err != nil {
log.Printf("mission_control bridge_trace tx=%s fetch_error=%v rpc_fallback_error=%v", strings.ToLower(tx), err, rpcErr)
writeError(w, http.StatusBadGateway, "bad_gateway", err.Error())
return
}
log.Printf("mission_control bridge_trace tx=%s upstream_status=%d rpc_fallback_error=%v", strings.ToLower(tx), statusCode, rpcErr)
writeError(w, http.StatusBadGateway, "blockscout_error",
fmt.Sprintf("blockscout HTTP %d", statusCode))
return
}
}
if fromAddr != "" {
fromLabel = reg[fromAddr]
}
if toAddr != "" {
toLabel = reg[toAddr]
}
out := map[string]interface{}{
"tx_hash": strings.ToLower(tx),
"from": fromAddr,
"from_registry": fromLabel,
"to": toAddr,
"to_registry": toLabel,
"blockscout_url": publicBase + "/tx/" + strings.ToLower(tx),
"source": source,
}
if registryLoadErr != nil && len(reg) == 0 {
out["registry_warning"] = registryLoadErr.Error()
}
log.Printf("mission_control bridge_trace tx=%s from=%s to=%s from_label=%s to_label=%s", strings.ToLower(tx), fromAddr, toAddr, fromLabel, toLabel)
writeJSON(w, http.StatusOK, map[string]interface{}{"data": out})
}