119 lines
3.1 KiB
Go
119 lines
3.1 KiB
Go
package backfill
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"math/big"
|
|
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
"github.com/explorer/backend/indexer/processor"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// BackfillWorker handles historical block indexing
|
|
type BackfillWorker struct {
|
|
db *pgxpool.Pool
|
|
client *ethclient.Client
|
|
processor *processor.BlockProcessor
|
|
chainID int
|
|
batchSize int
|
|
startBlock int64
|
|
endBlock int64
|
|
}
|
|
|
|
// NewBackfillWorker creates a new backfill worker
|
|
func NewBackfillWorker(db *pgxpool.Pool, client *ethclient.Client, chainID int, batchSize int) *BackfillWorker {
|
|
proc := processor.NewBlockProcessor(db, client, chainID)
|
|
return &BackfillWorker{
|
|
db: db,
|
|
client: client,
|
|
processor: proc,
|
|
chainID: chainID,
|
|
batchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
// SetRange sets the block range to backfill
|
|
func (bw *BackfillWorker) SetRange(startBlock, endBlock int64) {
|
|
bw.startBlock = startBlock
|
|
bw.endBlock = endBlock
|
|
}
|
|
|
|
// Run starts the backfill process
|
|
func (bw *BackfillWorker) Run(ctx context.Context) error {
|
|
currentBlock := bw.startBlock
|
|
checkpoint := bw.getCheckpoint(ctx)
|
|
|
|
if checkpoint > currentBlock {
|
|
currentBlock = checkpoint
|
|
log.Printf("Resuming from checkpoint: block %d", currentBlock)
|
|
}
|
|
|
|
for currentBlock <= bw.endBlock {
|
|
// Process batch
|
|
endBatch := currentBlock + int64(bw.batchSize) - 1
|
|
if endBatch > bw.endBlock {
|
|
endBatch = bw.endBlock
|
|
}
|
|
|
|
if err := bw.processBatch(ctx, currentBlock, endBatch); err != nil {
|
|
return fmt.Errorf("failed to process batch %d-%d: %w", currentBlock, endBatch, err)
|
|
}
|
|
|
|
// Update checkpoint
|
|
if err := bw.saveCheckpoint(ctx, endBatch); err != nil {
|
|
log.Printf("Warning: failed to save checkpoint: %v", err)
|
|
}
|
|
|
|
log.Printf("Processed blocks %d-%d", currentBlock, endBatch)
|
|
currentBlock = endBatch + 1
|
|
|
|
// Check for cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processBatch processes a batch of blocks
|
|
func (bw *BackfillWorker) processBatch(ctx context.Context, start, end int64) error {
|
|
for blockNum := start; blockNum <= end; blockNum++ {
|
|
block, err := bw.client.BlockByNumber(ctx, big.NewInt(blockNum))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch block %d: %w", blockNum, err)
|
|
}
|
|
|
|
if err := bw.processor.ProcessBlock(ctx, block); err != nil {
|
|
return fmt.Errorf("failed to process block %d: %w", blockNum, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getCheckpoint gets the last processed block from checkpoint
|
|
func (bw *BackfillWorker) getCheckpoint(ctx context.Context) int64 {
|
|
var checkpoint int64
|
|
query := `SELECT last_block FROM backfill_checkpoints WHERE chain_id = $1`
|
|
err := bw.db.QueryRow(ctx, query, bw.chainID).Scan(&checkpoint)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return checkpoint
|
|
}
|
|
|
|
// saveCheckpoint saves the checkpoint
|
|
func (bw *BackfillWorker) saveCheckpoint(ctx context.Context, blockNum int64) error {
|
|
query := `
|
|
INSERT INTO backfill_checkpoints (chain_id, last_block, updated_at)
|
|
VALUES ($1, $2, NOW())
|
|
ON CONFLICT (chain_id) DO UPDATE SET last_block = $2, updated_at = NOW()
|
|
`
|
|
_, err := bw.db.Exec(ctx, query, bw.chainID, blockNum)
|
|
return err
|
|
}
|