Files

119 lines
3.1 KiB
Go

package backfill
import (
"context"
"fmt"
"log"
"math/big"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/explorer/backend/indexer/processor"
"github.com/jackc/pgx/v5/pgxpool"
)
// BackfillWorker handles historical block indexing
type BackfillWorker struct {
db *pgxpool.Pool
client *ethclient.Client
processor *processor.BlockProcessor
chainID int
batchSize int
startBlock int64
endBlock int64
}
// NewBackfillWorker creates a new backfill worker
func NewBackfillWorker(db *pgxpool.Pool, client *ethclient.Client, chainID int, batchSize int) *BackfillWorker {
proc := processor.NewBlockProcessor(db, client, chainID)
return &BackfillWorker{
db: db,
client: client,
processor: proc,
chainID: chainID,
batchSize: batchSize,
}
}
// SetRange sets the block range to backfill
func (bw *BackfillWorker) SetRange(startBlock, endBlock int64) {
bw.startBlock = startBlock
bw.endBlock = endBlock
}
// Run starts the backfill process
func (bw *BackfillWorker) Run(ctx context.Context) error {
currentBlock := bw.startBlock
checkpoint := bw.getCheckpoint(ctx)
if checkpoint > currentBlock {
currentBlock = checkpoint
log.Printf("Resuming from checkpoint: block %d", currentBlock)
}
for currentBlock <= bw.endBlock {
// Process batch
endBatch := currentBlock + int64(bw.batchSize) - 1
if endBatch > bw.endBlock {
endBatch = bw.endBlock
}
if err := bw.processBatch(ctx, currentBlock, endBatch); err != nil {
return fmt.Errorf("failed to process batch %d-%d: %w", currentBlock, endBatch, err)
}
// Update checkpoint
if err := bw.saveCheckpoint(ctx, endBatch); err != nil {
log.Printf("Warning: failed to save checkpoint: %v", err)
}
log.Printf("Processed blocks %d-%d", currentBlock, endBatch)
currentBlock = endBatch + 1
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
}
// processBatch processes a batch of blocks
func (bw *BackfillWorker) processBatch(ctx context.Context, start, end int64) error {
for blockNum := start; blockNum <= end; blockNum++ {
block, err := bw.client.BlockByNumber(ctx, big.NewInt(blockNum))
if err != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockNum, err)
}
if err := bw.processor.ProcessBlock(ctx, block); err != nil {
return fmt.Errorf("failed to process block %d: %w", blockNum, err)
}
}
return nil
}
// getCheckpoint gets the last processed block from checkpoint
func (bw *BackfillWorker) getCheckpoint(ctx context.Context) int64 {
var checkpoint int64
query := `SELECT last_block FROM backfill_checkpoints WHERE chain_id = $1`
err := bw.db.QueryRow(ctx, query, bw.chainID).Scan(&checkpoint)
if err != nil {
return 0
}
return checkpoint
}
// saveCheckpoint saves the checkpoint
func (bw *BackfillWorker) saveCheckpoint(ctx context.Context, blockNum int64) error {
query := `
INSERT INTO backfill_checkpoints (chain_id, last_block, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (chain_id) DO UPDATE SET last_block = $2, updated_at = NOW()
`
_, err := bw.db.Exec(ctx, query, bw.chainID, blockNum)
return err
}