Files
explorer-monorepo/backend/api/websocket/server.go
defiQUG bdae5a9f6e feat: explorer API, wallet, CCIP scripts, and config refresh
- Backend REST/gateway/track routes, analytics, Blockscout proxy paths.
- Frontend wallet and liquidity surfaces; MetaMask token list alignment.
- Deployment docs, verification scripts, address inventory updates.

Check: go build ./... under backend/ (pass).
Made-with: Cursor
2026-04-07 23:22:12 -07:00

283 lines
5.7 KiB
Go

package websocket
import (
"encoding/json"
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return websocketOriginAllowed(r)
},
}
func websocketOriginAllowed(r *http.Request) bool {
origin := strings.TrimSpace(r.Header.Get("Origin"))
if origin == "" {
return true
}
allowedOrigins := splitAllowedOrigins(os.Getenv("WEBSOCKET_ALLOWED_ORIGINS"))
if len(allowedOrigins) == 0 {
return sameOriginHost(origin, r.Host)
}
for _, allowed := range allowedOrigins {
if allowed == "*" || strings.EqualFold(allowed, origin) {
return true
}
}
return false
}
func splitAllowedOrigins(raw string) []string {
if strings.TrimSpace(raw) == "" {
return nil
}
parts := strings.Split(raw, ",")
origins := make([]string, 0, len(parts))
for _, part := range parts {
trimmed := strings.TrimSpace(part)
if trimmed != "" {
origins = append(origins, trimmed)
}
}
return origins
}
func sameOriginHost(origin, requestHost string) bool {
parsedOrigin, err := url.Parse(origin)
if err != nil {
return false
}
originHost := parsedOrigin.Hostname()
requestHostname := requestHost
if host, _, err := net.SplitHostPort(requestHost); err == nil {
requestHostname = host
}
return strings.EqualFold(originHost, requestHostname)
}
// Server represents the WebSocket server
type Server struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
// Client represents a WebSocket client
type Client struct {
conn *websocket.Conn
send chan []byte
server *Server
subscriptions map[string]bool
}
// NewServer creates a new WebSocket server
func NewServer() *Server {
return &Server{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
// Start starts the WebSocket server
func (s *Server) Start() {
for {
select {
case client := <-s.register:
s.mu.Lock()
s.clients[client] = true
count := len(s.clients)
s.mu.Unlock()
log.Printf("Client connected. Total clients: %d", count)
case client := <-s.unregister:
s.mu.Lock()
if _, ok := s.clients[client]; ok {
delete(s.clients, client)
close(client.send)
}
count := len(s.clients)
s.mu.Unlock()
log.Printf("Client disconnected. Total clients: %d", count)
case message := <-s.broadcast:
s.mu.Lock()
for client := range s.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(s.clients, client)
}
}
s.mu.Unlock()
}
}
}
// HandleWebSocket handles WebSocket connections
func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade failed: %v", err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
server: s,
subscriptions: make(map[string]bool),
}
s.register <- client
go client.writePump()
go client.readPump()
}
// Broadcast sends a message to all connected clients
func (s *Server) Broadcast(message []byte) {
s.broadcast <- message
}
// readPump reads messages from the WebSocket connection
func (c *Client) readPump() {
defer func() {
c.server.unregister <- c
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket error: %v", err)
}
break
}
// Handle message
var msg map[string]interface{}
if err := json.Unmarshal(message, &msg); err != nil {
continue
}
c.handleMessage(msg)
}
}
// writePump writes messages to the WebSocket connection
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// handleMessage handles incoming WebSocket messages
func (c *Client) handleMessage(msg map[string]interface{}) {
msgType, ok := msg["type"].(string)
if !ok {
return
}
switch msgType {
case "subscribe":
channel, _ := msg["channel"].(string)
c.subscriptions[channel] = true
c.sendMessage(map[string]interface{}{
"type": "subscribed",
"channel": channel,
})
case "unsubscribe":
channel, _ := msg["channel"].(string)
delete(c.subscriptions, channel)
c.sendMessage(map[string]interface{}{
"type": "unsubscribed",
"channel": channel,
})
case "ping":
c.sendMessage(map[string]interface{}{
"type": "pong",
"timestamp": time.Now().Unix(),
})
}
}
// sendMessage sends a message to the client
func (c *Client) sendMessage(msg map[string]interface{}) {
data, err := json.Marshal(msg)
if err != nil {
return
}
select {
case c.send <- data:
default:
close(c.send)
}
}