Files

253 lines
6.7 KiB
Go
Raw Permalink Normal View History

package processor
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// BlockProcessor processes blocks and extracts data
type BlockProcessor struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewBlockProcessor creates a new block processor
func NewBlockProcessor(db *pgxpool.Pool, client *ethclient.Client, chainID int) *BlockProcessor {
return &BlockProcessor{
db: db,
client: client,
chainID: chainID,
}
}
// ProcessBlock processes a block and stores it in the database
func (bp *BlockProcessor) ProcessBlock(ctx context.Context, block *types.Block) error {
tx, err := bp.db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)
// Insert block
if err := bp.insertBlock(ctx, tx, block); err != nil {
return fmt.Errorf("failed to insert block: %w", err)
}
// Process transactions
for i, txData := range block.Transactions() {
if err := bp.processTransaction(ctx, tx, block, txData, i); err != nil {
return fmt.Errorf("failed to process transaction: %w", err)
}
}
return tx.Commit(ctx)
}
// insertBlock inserts a block into the database
func (bp *BlockProcessor) insertBlock(ctx context.Context, tx pgx.Tx, block *types.Block) error {
query := `
INSERT INTO blocks (
chain_id, number, hash, parent_hash, nonce, sha3_uncles,
logs_bloom, transactions_root, state_root, receipts_root,
miner, difficulty, total_difficulty, size, extra_data,
gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (chain_id, number) DO NOTHING
`
var nonce, sha3Uncles, difficulty, totalDifficulty sql.NullString
if block.Header().Nonce.Uint64() > 0 {
nonce.String = fmt.Sprintf("0x%x", block.Header().Nonce.Uint64())
nonce.Valid = true
}
if len(block.Header().UncleHash.Bytes()) > 0 {
sha3Uncles.String = block.Header().UncleHash.Hex()
sha3Uncles.Valid = true
}
if block.Header().Difficulty != nil {
difficulty.String = block.Header().Difficulty.String()
difficulty.Valid = true
totalDifficulty.String = block.Header().Difficulty.String() // Simplified
totalDifficulty.Valid = true
}
var baseFeePerGas sql.NullInt64
if block.Header().BaseFee != nil {
baseFeePerGas.Int64 = block.Header().BaseFee.Int64()
baseFeePerGas.Valid = true
}
_, err := tx.Exec(ctx, query,
bp.chainID,
block.Number().Int64(),
block.Hash().Hex(),
block.ParentHash().Hex(),
nonce,
sha3Uncles,
block.Header().Bloom.Big().String(),
block.Header().TxHash.Hex(),
block.Header().Root.Hex(),
block.Header().ReceiptHash.Hex(),
block.Coinbase().Hex(),
difficulty,
totalDifficulty,
int64(block.Size()),
fmt.Sprintf("0x%x", block.Header().Extra),
block.Header().GasLimit,
block.Header().GasUsed,
time.Unix(int64(block.Header().Time), 0),
len(block.Transactions()),
baseFeePerGas,
)
return err
}
// processTransaction processes a transaction and stores it
func (bp *BlockProcessor) processTransaction(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, index int) error {
// Get receipt
receipt, err := bp.getReceipt(ctx, txData.Hash())
if err != nil {
return fmt.Errorf("failed to get receipt: %w", err)
}
query := `
INSERT INTO transactions (
chain_id, hash, block_number, block_hash, transaction_index,
from_address, to_address, value, gas_price, max_fee_per_gas,
max_priority_fee_per_gas, gas_limit, gas_used, nonce,
input_data, status, contract_address, cumulative_gas_used,
effective_gas_price
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
ON CONFLICT (chain_id, hash) DO NOTHING
`
from, _ := types.Sender(types.LatestSignerForChainID(txData.ChainId()), txData)
var toAddress sql.NullString
if txData.To() != nil {
toAddress.String = txData.To().Hex()
toAddress.Valid = true
}
var maxFeePerGas, maxPriorityFeePerGas sql.NullInt64
if txData.Type() == types.DynamicFeeTxType {
if txData.GasFeeCap() != nil {
maxFeePerGas.Int64 = txData.GasFeeCap().Int64()
maxFeePerGas.Valid = true
}
if txData.GasTipCap() != nil {
maxPriorityFeePerGas.Int64 = txData.GasTipCap().Int64()
maxPriorityFeePerGas.Valid = true
}
}
var contractAddress sql.NullString
if receipt != nil && receipt.ContractAddress != (common.Address{}) {
contractAddress.String = receipt.ContractAddress.Hex()
contractAddress.Valid = true
}
var status sql.NullInt64
if receipt != nil {
status.Int64 = int64(receipt.Status)
status.Valid = true
}
var effectiveGasPrice sql.NullInt64
if receipt != nil && receipt.EffectiveGasPrice != nil {
effectiveGasPrice.Int64 = receipt.EffectiveGasPrice.Int64()
effectiveGasPrice.Valid = true
}
_, err = tx.Exec(ctx, query,
bp.chainID,
txData.Hash().Hex(),
block.Number().Int64(),
block.Hash().Hex(),
index,
from.Hex(),
toAddress,
txData.Value().String(),
txData.GasPrice().Int64(),
maxFeePerGas,
maxPriorityFeePerGas,
txData.Gas(),
receipt.GasUsed,
txData.Nonce(),
fmt.Sprintf("0x%x", txData.Data()),
status,
contractAddress,
receipt.CumulativeGasUsed,
effectiveGasPrice,
)
if err != nil {
return err
}
// Process logs
return bp.processLogs(ctx, tx, block, txData, receipt)
}
// processLogs processes transaction logs
func (bp *BlockProcessor) processLogs(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, receipt *types.Receipt) error {
if receipt == nil {
return nil
}
for i, log := range receipt.Logs {
query := `
INSERT INTO logs (
chain_id, transaction_hash, block_number, block_hash,
log_index, address, topic0, topic1, topic2, topic3, data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
`
var topics [4]sql.NullString
for j, topic := range log.Topics {
if j < 4 {
topics[j].String = topic.Hex()
topics[j].Valid = true
}
}
_, err := tx.Exec(ctx, query,
bp.chainID,
txData.Hash().Hex(),
block.Number().Int64(),
block.Hash().Hex(),
i,
log.Address.Hex(),
topics[0],
topics[1],
topics[2],
topics[3],
fmt.Sprintf("0x%x", log.Data),
)
if err != nil {
return err
}
}
return nil
}
// getReceipt gets a transaction receipt
func (bp *BlockProcessor) getReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if bp.client == nil {
return nil, fmt.Errorf("RPC client not configured")
}
return bp.client.TransactionReceipt(ctx, txHash)
}