package backfill import ( "context" "fmt" "log" "math/big" "github.com/ethereum/go-ethereum/ethclient" "github.com/explorer/backend/indexer/processor" "github.com/jackc/pgx/v5/pgxpool" ) // BackfillWorker handles historical block indexing type BackfillWorker struct { db *pgxpool.Pool client *ethclient.Client processor *processor.BlockProcessor chainID int batchSize int startBlock int64 endBlock int64 } // NewBackfillWorker creates a new backfill worker func NewBackfillWorker(db *pgxpool.Pool, client *ethclient.Client, chainID int, batchSize int) *BackfillWorker { proc := processor.NewBlockProcessor(db, client, chainID) return &BackfillWorker{ db: db, client: client, processor: proc, chainID: chainID, batchSize: batchSize, } } // SetRange sets the block range to backfill func (bw *BackfillWorker) SetRange(startBlock, endBlock int64) { bw.startBlock = startBlock bw.endBlock = endBlock } // Run starts the backfill process func (bw *BackfillWorker) Run(ctx context.Context) error { currentBlock := bw.startBlock checkpoint := bw.getCheckpoint(ctx) if checkpoint > currentBlock { currentBlock = checkpoint log.Printf("Resuming from checkpoint: block %d", currentBlock) } for currentBlock <= bw.endBlock { // Process batch endBatch := currentBlock + int64(bw.batchSize) - 1 if endBatch > bw.endBlock { endBatch = bw.endBlock } if err := bw.processBatch(ctx, currentBlock, endBatch); err != nil { return fmt.Errorf("failed to process batch %d-%d: %w", currentBlock, endBatch, err) } // Update checkpoint if err := bw.saveCheckpoint(ctx, endBatch); err != nil { log.Printf("Warning: failed to save checkpoint: %v", err) } log.Printf("Processed blocks %d-%d", currentBlock, endBatch) currentBlock = endBatch + 1 // Check for cancellation select { case <-ctx.Done(): return ctx.Err() default: } } return nil } // processBatch processes a batch of blocks func (bw *BackfillWorker) processBatch(ctx context.Context, start, end int64) error { for blockNum := start; blockNum <= end; blockNum++ { block, err := bw.client.BlockByNumber(ctx, big.NewInt(blockNum)) if err != nil { return fmt.Errorf("failed to fetch block %d: %w", blockNum, err) } if err := bw.processor.ProcessBlock(ctx, block); err != nil { return fmt.Errorf("failed to process block %d: %w", blockNum, err) } } return nil } // getCheckpoint gets the last processed block from checkpoint func (bw *BackfillWorker) getCheckpoint(ctx context.Context) int64 { var checkpoint int64 query := `SELECT last_block FROM backfill_checkpoints WHERE chain_id = $1` err := bw.db.QueryRow(ctx, query, bw.chainID).Scan(&checkpoint) if err != nil { return 0 } return checkpoint } // saveCheckpoint saves the checkpoint func (bw *BackfillWorker) saveCheckpoint(ctx context.Context, blockNum int64) error { query := ` INSERT INTO backfill_checkpoints (chain_id, last_block, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (chain_id) DO UPDATE SET last_block = $2, updated_at = NOW() ` _, err := bw.db.Exec(ctx, query, bw.chainID, blockNum) return err }