package processor import ( "context" "database/sql" "fmt" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // BlockProcessor processes blocks and extracts data type BlockProcessor struct { db *pgxpool.Pool client *ethclient.Client chainID int } // NewBlockProcessor creates a new block processor func NewBlockProcessor(db *pgxpool.Pool, client *ethclient.Client, chainID int) *BlockProcessor { return &BlockProcessor{ db: db, client: client, chainID: chainID, } } // ProcessBlock processes a block and stores it in the database func (bp *BlockProcessor) ProcessBlock(ctx context.Context, block *types.Block) error { tx, err := bp.db.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback(ctx) // Insert block if err := bp.insertBlock(ctx, tx, block); err != nil { return fmt.Errorf("failed to insert block: %w", err) } // Process transactions for i, txData := range block.Transactions() { if err := bp.processTransaction(ctx, tx, block, txData, i); err != nil { return fmt.Errorf("failed to process transaction: %w", err) } } return tx.Commit(ctx) } // insertBlock inserts a block into the database func (bp *BlockProcessor) insertBlock(ctx context.Context, tx pgx.Tx, block *types.Block) error { query := ` INSERT INTO blocks ( chain_id, number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) ON CONFLICT (chain_id, number) DO NOTHING ` var nonce, sha3Uncles, difficulty, totalDifficulty sql.NullString if block.Header().Nonce.Uint64() > 0 { nonce.String = fmt.Sprintf("0x%x", block.Header().Nonce.Uint64()) nonce.Valid = true } if len(block.Header().UncleHash.Bytes()) > 0 { sha3Uncles.String = block.Header().UncleHash.Hex() sha3Uncles.Valid = true } if block.Header().Difficulty != nil { difficulty.String = block.Header().Difficulty.String() difficulty.Valid = true totalDifficulty.String = block.Header().Difficulty.String() // Simplified totalDifficulty.Valid = true } var baseFeePerGas sql.NullInt64 if block.Header().BaseFee != nil { baseFeePerGas.Int64 = block.Header().BaseFee.Int64() baseFeePerGas.Valid = true } _, err := tx.Exec(ctx, query, bp.chainID, block.Number().Int64(), block.Hash().Hex(), block.ParentHash().Hex(), nonce, sha3Uncles, block.Header().Bloom.Big().String(), block.Header().TxHash.Hex(), block.Header().Root.Hex(), block.Header().ReceiptHash.Hex(), block.Coinbase().Hex(), difficulty, totalDifficulty, int64(block.Size()), fmt.Sprintf("0x%x", block.Header().Extra), block.Header().GasLimit, block.Header().GasUsed, time.Unix(int64(block.Header().Time), 0), len(block.Transactions()), baseFeePerGas, ) return err } // processTransaction processes a transaction and stores it func (bp *BlockProcessor) processTransaction(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, index int) error { // Get receipt receipt, err := bp.getReceipt(ctx, txData.Hash()) if err != nil { return fmt.Errorf("failed to get receipt: %w", err) } query := ` INSERT INTO transactions ( chain_id, hash, block_number, block_hash, transaction_index, from_address, to_address, value, gas_price, max_fee_per_gas, max_priority_fee_per_gas, gas_limit, gas_used, nonce, input_data, status, contract_address, cumulative_gas_used, effective_gas_price ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) ON CONFLICT (chain_id, hash) DO NOTHING ` from, _ := types.Sender(types.LatestSignerForChainID(txData.ChainId()), txData) var toAddress sql.NullString if txData.To() != nil { toAddress.String = txData.To().Hex() toAddress.Valid = true } var maxFeePerGas, maxPriorityFeePerGas sql.NullInt64 if txData.Type() == types.DynamicFeeTxType { if txData.GasFeeCap() != nil { maxFeePerGas.Int64 = txData.GasFeeCap().Int64() maxFeePerGas.Valid = true } if txData.GasTipCap() != nil { maxPriorityFeePerGas.Int64 = txData.GasTipCap().Int64() maxPriorityFeePerGas.Valid = true } } var contractAddress sql.NullString if receipt != nil && receipt.ContractAddress != (common.Address{}) { contractAddress.String = receipt.ContractAddress.Hex() contractAddress.Valid = true } var status sql.NullInt64 if receipt != nil { status.Int64 = int64(receipt.Status) status.Valid = true } var effectiveGasPrice sql.NullInt64 if receipt != nil && receipt.EffectiveGasPrice != nil { effectiveGasPrice.Int64 = receipt.EffectiveGasPrice.Int64() effectiveGasPrice.Valid = true } _, err = tx.Exec(ctx, query, bp.chainID, txData.Hash().Hex(), block.Number().Int64(), block.Hash().Hex(), index, from.Hex(), toAddress, txData.Value().String(), txData.GasPrice().Int64(), maxFeePerGas, maxPriorityFeePerGas, txData.Gas(), receipt.GasUsed, txData.Nonce(), fmt.Sprintf("0x%x", txData.Data()), status, contractAddress, receipt.CumulativeGasUsed, effectiveGasPrice, ) if err != nil { return err } // Process logs return bp.processLogs(ctx, tx, block, txData, receipt) } // processLogs processes transaction logs func (bp *BlockProcessor) processLogs(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, receipt *types.Receipt) error { if receipt == nil { return nil } for i, log := range receipt.Logs { query := ` INSERT INTO logs ( chain_id, transaction_hash, block_number, block_hash, log_index, address, topic0, topic1, topic2, topic3, data ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING ` var topics [4]sql.NullString for j, topic := range log.Topics { if j < 4 { topics[j].String = topic.Hex() topics[j].Valid = true } } _, err := tx.Exec(ctx, query, bp.chainID, txData.Hash().Hex(), block.Number().Int64(), block.Hash().Hex(), i, log.Address.Hex(), topics[0], topics[1], topics[2], topics[3], fmt.Sprintf("0x%x", log.Data), ) if err != nil { return err } } return nil } // getReceipt gets a transaction receipt func (bp *BlockProcessor) getReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { if bp.client == nil { return nil, fmt.Errorf("RPC client not configured") } return bp.client.TransactionReceipt(ctx, txHash) }