27 KiB
Ledger Backfill Strategy
Version: 1.0.0
Last Updated: 2025-01-20
Status: Active Documentation
Overview
This document outlines the strategy for backfilling historical ledger data into the DBIS Core Banking System ledger. The backfill process ensures data integrity, maintains idempotency, and supports resumable operations.
Backfill Scenarios
Scenario 1: Initial System Setup (Empty Ledger)
Use Case: Setting up a new DBIS instance with empty ledger, populating from external source (e.g., legacy system, CSV export, external API).
Approach:
- Validate source data integrity
- Transform source data to DBIS ledger format
- Batch insert with idempotency checks
- Verify balance consistency
- Apply constraints after backfill
Scenario 2: Schema Migration (Existing Ledger Data)
Use Case: Migrating existing ledger data to new schema (e.g., adding new fields, restructuring).
Approach:
- Audit existing data
- Transform to new schema format
- Migrate in batches
- Verify data integrity
- Update schema constraints
Scenario 3: Data Reconciliation (Fix Inconsistencies)
Use Case: Fixing inconsistent balances or missing entries discovered during audit.
Approach:
- Identify inconsistencies
- Generate correction entries
- Apply corrections via normal posting function
- Verify balance consistency
- Document corrections in audit log
Scenario 4: Dual-Ledger Sync (SCB Ledger Backfill)
Use Case: Backfilling historical entries from SCB (Sovereign Central Bank) ledger to DBIS.
Approach:
- Extract entries from SCB ledger
- Transform to DBIS format
- Post to DBIS via outbox pattern
- Track sync status
- Verify dual-ledger consistency
Backfill Architecture
Component Overview
┌─────────────────────────────────────────────────────────────┐
│ Backfill Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ Source │────▶│ Transform │────▶│ Validate │ │
│ │ Reader │ │ Service │ │ Service │ │
│ └─────────────┘ └──────────────┘ └─────────────┘ │
│ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Batch │ │
│ │ Processor │ │
│ └──────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Ledger │ │ Checkpoint │ │
│ │ Posting │ │ Service │ │
│ │ Module │ └──────────────┘ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Audit & │ │
│ │ Verification │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Key Components
- Source Reader: Reads data from source (CSV, API, database, etc.)
- Transform Service: Transforms source data to DBIS ledger format
- Validate Service: Validates entries before posting
- Batch Processor: Processes entries in batches with checkpointing
- Ledger Posting Module: Uses atomic posting function for entries
- Checkpoint Service: Tracks progress for resumable backfill
- Audit & Verification: Validates backfill results
Backfill Process
Step 1: Pre-Backfill Preparation
1.1 Audit Existing Data
Before starting backfill, audit existing data:
-- Check for existing ledger entries
SELECT COUNT(*), MIN(timestamp_utc), MAX(timestamp_utc)
FROM ledger_entries;
-- Check for inconsistent balances
SELECT id, balance, available_balance, reserved_balance
FROM bank_accounts
WHERE available_balance < 0
OR reserved_balance < 0
OR available_balance > balance
OR (available_balance + reserved_balance) > balance;
-- Check for duplicate reference IDs
SELECT ledger_id, reference_id, COUNT(*)
FROM ledger_entries
GROUP BY ledger_id, reference_id
HAVING COUNT(*) > 1;
1.2 Verify Schema
Ensure all required migrations are applied:
-- Verify idempotency constraint exists
SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_name = 'ledger_entries'
AND constraint_name LIKE '%reference%';
-- Verify outbox table exists
SELECT COUNT(*) FROM dual_ledger_outbox;
-- Verify posting function exists
SELECT routine_name
FROM information_schema.routines
WHERE routine_name = 'post_ledger_entry';
1.3 Prepare Source Data
- CSV Export: Ensure format matches expected schema
- API Extraction: Configure API endpoints and authentication
- Database Extraction: Set up connection and query filters
- Legacy System: Configure export format and mapping
Step 2: Data Transformation
2.1 Source Data Format
Source data should be transformed to this format:
interface LedgerEntrySource {
ledgerId: string; // e.g., "MASTER", "SOVEREIGN"
debitAccountId: string; // Account ID
creditAccountId: string; // Account ID
amount: string; // Decimal as string (e.g., "1000.00")
currencyCode: string; // ISO 4217 (e.g., "USD")
assetType: string; // "fiat", "cbdc", "commodity", "security"
transactionType: string; // Transaction type classification
referenceId: string; // Unique reference ID (required for idempotency)
timestampUtc?: string; // ISO 8601 timestamp
fxRate?: string; // FX rate if applicable
metadata?: Record<string, unknown>; // Additional metadata
}
2.2 Transformation Rules
- Account ID Mapping: Map source account identifiers to DBIS account IDs
- Amount Normalization: Convert amounts to standard format (decimal string)
- Currency Validation: Validate currency codes against ISO 4217
- Timestamp Normalization: Convert timestamps to UTC ISO 8601 format
- Reference ID Generation: Generate unique reference IDs if not present
- Metadata Extraction: Extract relevant metadata from source
2.3 Example Transformation Script
// Example: Transform CSV data
function transformCSVToLedgerEntry(csvRow: CSVRow): LedgerEntrySource {
return {
ledgerId: csvRow.ledger || 'MASTER',
debitAccountId: mapAccountId(csvRow.debit_account),
creditAccountId: mapAccountId(csvRow.credit_account),
amount: normalizeAmount(csvRow.amount),
currencyCode: csvRow.currency || 'USD',
assetType: csvRow.asset_type || 'fiat',
transactionType: mapTransactionType(csvRow.txn_type),
referenceId: csvRow.reference_id || generateReferenceId(csvRow),
timestampUtc: csvRow.timestamp || new Date().toISOString(),
fxRate: csvRow.fx_rate || undefined,
metadata: extractMetadata(csvRow),
};
}
Step 3: Batch Processing
3.1 Batch Configuration
Configure batch processing parameters:
interface BackfillConfig {
batchSize: number; // Entries per batch (default: 1000)
checkpointInterval: number; // Checkpoint every N batches (default: 10)
maxRetries: number; // Max retries per batch (default: 3)
retryDelay: number; // Initial retry delay in ms (default: 1000)
parallelWorkers: number; // Number of parallel workers (default: 1)
skipDuplicates: boolean; // Skip entries with duplicate reference IDs (default: true)
validateBalances: boolean; // Validate balances after each batch (default: true)
}
3.2 Checkpointing Strategy
Use checkpointing to enable resumable backfill:
-- Create checkpoint table for ledger backfill
CREATE TABLE IF NOT EXISTS ledger_backfill_checkpoints (
id SERIAL PRIMARY KEY,
source_id VARCHAR(255) NOT NULL,
source_type VARCHAR(50) NOT NULL, -- 'CSV', 'API', 'DATABASE', 'SCB'
last_processed_id VARCHAR(255),
last_processed_timestamp TIMESTAMP,
total_processed BIGINT DEFAULT 0,
total_successful BIGINT DEFAULT 0,
total_failed BIGINT DEFAULT 0,
status VARCHAR(50) DEFAULT 'IN_PROGRESS', -- 'IN_PROGRESS', 'COMPLETED', 'FAILED', 'PAUSED'
started_at TIMESTAMP DEFAULT NOW(),
last_checkpoint_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
error_message TEXT,
metadata JSONB,
UNIQUE(source_id, source_type)
);
CREATE INDEX idx_backfill_checkpoints_status
ON ledger_backfill_checkpoints(status);
CREATE INDEX idx_backfill_checkpoints_source
ON ledger_backfill_checkpoints(source_id, source_type);
3.3 Batch Processing Loop
async function processBackfill(
source: DataSource,
config: BackfillConfig
): Promise<BackfillResult> {
const checkpoint = await loadCheckpoint(source.id, source.type);
let processed = 0;
let successful = 0;
let failed = 0;
let lastProcessedId: string | null = null;
let lastProcessedTimestamp: Date | null = null;
while (true) {
// Load batch from source (starting from checkpoint)
const batch = await source.readBatch({
startId: checkpoint?.lastProcessedId,
startTimestamp: checkpoint?.lastProcessedTimestamp,
limit: config.batchSize,
});
if (batch.length === 0) {
break; // No more data
}
// Process batch
const results = await processBatch(batch, config);
// Update counters
processed += batch.length;
successful += results.successful;
failed += results.failed;
// Update checkpoint
lastProcessedId = batch[batch.length - 1].id;
lastProcessedTimestamp = batch[batch.length - 1].timestamp;
await saveCheckpoint({
sourceId: source.id,
sourceType: source.type,
lastProcessedId,
lastProcessedTimestamp,
totalProcessed: processed,
totalSuccessful: successful,
totalFailed: failed,
status: 'IN_PROGRESS',
});
// Validate balances if configured
if (config.validateBalances && processed % (config.checkpointInterval * config.batchSize) === 0) {
await validateBalances();
}
}
// Mark as completed
await saveCheckpoint({
sourceId: source.id,
sourceType: source.type,
status: 'COMPLETED',
completedAt: new Date(),
});
return {
totalProcessed: processed,
totalSuccessful: successful,
totalFailed: failed,
};
}
Step 4: Entry Posting
4.1 Use Atomic Posting Function
Always use the atomic posting function for backfill entries:
async function postBackfillEntry(entry: LedgerEntrySource): Promise<void> {
try {
// Use atomic posting function via SQL
const result = await prisma.$executeRaw`
SELECT post_ledger_entry(
${entry.ledgerId}::TEXT,
${entry.debitAccountId}::TEXT,
${entry.creditAccountId}::TEXT,
${entry.amount}::NUMERIC,
${entry.currencyCode}::TEXT,
${entry.assetType}::TEXT,
${entry.transactionType}::TEXT,
${entry.referenceId}::TEXT,
${entry.fxRate || null}::NUMERIC,
${entry.metadata ? JSON.stringify(entry.metadata) : null}::JSONB
)
`;
// Verify result
if (!result) {
throw new Error('Failed to post ledger entry');
}
} catch (error) {
// Handle idempotency violation (duplicate reference ID)
if (error.code === '23505' || error.message?.includes('duplicate')) {
// Skip duplicate entries if configured
if (config.skipDuplicates) {
return; // Entry already exists, skip
}
throw new Error(`Duplicate reference ID: ${entry.referenceId}`);
}
throw error;
}
}
4.2 Batch Posting
Post entries in batches for efficiency:
async function processBatch(
entries: LedgerEntrySource[],
config: BackfillConfig
): Promise<{ successful: number; failed: number }> {
let successful = 0;
let failed = 0;
// Process in parallel if configured
if (config.parallelWorkers > 1) {
const chunks = chunkArray(entries, config.parallelWorkers);
const results = await Promise.allSettled(
chunks.map((chunk) => processChunk(chunk, config))
);
for (const result of results) {
if (result.status === 'fulfilled') {
successful += result.value.successful;
failed += result.value.failed;
} else {
failed += entries.length;
}
}
} else {
// Sequential processing
for (const entry of entries) {
try {
await postBackfillEntry(entry);
successful++;
} catch (error) {
failed++;
logError(entry, error);
}
}
}
return { successful, failed };
}
Step 5: Balance Constraints Application
5.1 Pre-Constraint Validation
Before applying balance constraints, validate all balances:
-- Validate all balances are consistent
DO $$
DECLARE
inconsistent_count INTEGER;
BEGIN
SELECT COUNT(*) INTO inconsistent_count
FROM bank_accounts
WHERE available_balance < 0
OR reserved_balance < 0
OR available_balance > balance
OR (available_balance + reserved_balance) > balance;
IF inconsistent_count > 0 THEN
RAISE EXCEPTION 'Found % inconsistent balances. Fix before applying constraints.', inconsistent_count;
END IF;
END $$;
5.2 Apply Constraints
After backfill completes and balances are validated, apply constraints:
# Apply balance constraints migration
psql $DATABASE_URL -f db/migrations/004_balance_constraints.sql
5.3 Post-Constraint Verification
Verify constraints are applied correctly:
-- Check constraint exists
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'bank_accounts'
AND constraint_name LIKE '%balance%';
-- Verify constraint is enforced
-- This should fail if constraints are working
UPDATE bank_accounts
SET available_balance = -1
WHERE id = (SELECT id FROM bank_accounts LIMIT 1);
Step 6: Verification and Reconciliation
6.1 Entry Verification
Verify all entries were posted correctly:
-- Compare source count vs. posted count
SELECT
COUNT(*) as total_entries,
COUNT(DISTINCT reference_id) as unique_references,
COUNT(DISTINCT ledger_id) as unique_ledgers,
MIN(timestamp_utc) as earliest_entry,
MAX(timestamp_utc) as latest_entry
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%'; -- If using prefix for backfill entries
-- Check for missing entries
SELECT source_id, reference_id
FROM backfill_source_data
WHERE NOT EXISTS (
SELECT 1 FROM ledger_entries
WHERE reference_id = backfill_source_data.reference_id
);
6.2 Balance Reconciliation
Reconcile balances after backfill:
-- Compare expected vs. actual balances
SELECT
account_id,
expected_balance,
actual_balance,
(expected_balance - actual_balance) as difference
FROM (
SELECT
account_id,
SUM(CASE WHEN side = 'debit' THEN amount ELSE -amount END) as expected_balance,
(SELECT balance FROM bank_accounts WHERE id = account_id) as actual_balance
FROM ledger_entries
WHERE account_id IN (SELECT id FROM bank_accounts)
GROUP BY account_id
) reconciliation
WHERE ABS(expected_balance - actual_balance) > 0.01; -- Allow small rounding differences
6.3 Dual-Ledger Reconciliation
If backfilling from SCB ledger, reconcile dual-ledger consistency:
-- Check outbox sync status
SELECT
status,
COUNT(*) as count,
MIN(created_at) as oldest,
MAX(created_at) as newest
FROM dual_ledger_outbox
WHERE created_at >= (SELECT MIN(timestamp_utc) FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%')
GROUP BY status;
-- Verify all entries have corresponding outbox records (for SCB sync)
SELECT le.id, le.reference_id
FROM ledger_entries le
WHERE le.reference_id LIKE 'BACKFILL-%'
AND NOT EXISTS (
SELECT 1 FROM dual_ledger_outbox dlo
WHERE dlo.reference_id = le.reference_id
);
Implementation Scripts
TypeScript Backfill Script
File: dbis_core/scripts/backfill-ledger.ts
#!/usr/bin/env ts-node
import { PrismaClient } from '@prisma/client';
import { readFileSync } from 'fs';
import { parse } from 'csv-parse/sync';
const prisma = new PrismaClient();
interface BackfillConfig {
sourceFile: string;
ledgerId: string;
batchSize: number;
skipDuplicates: boolean;
}
async function backfillFromCSV(config: BackfillConfig) {
// Read and parse CSV
const csvData = readFileSync(config.sourceFile, 'utf-8');
const records = parse(csvData, {
columns: true,
skip_empty_lines: true,
});
let processed = 0;
let successful = 0;
let failed = 0;
// Process in batches
for (let i = 0; i < records.length; i += config.batchSize) {
const batch = records.slice(i, i + config.batchSize);
for (const record of batch) {
try {
// Transform and post entry
await prisma.$executeRaw`
SELECT post_ledger_entry(
${config.ledgerId}::TEXT,
${record.debitAccountId}::TEXT,
${record.creditAccountId}::TEXT,
${record.amount}::NUMERIC,
${record.currencyCode}::TEXT,
${record.assetType || 'fiat'}::TEXT,
${record.transactionType}::TEXT,
${record.referenceId}::TEXT,
${record.fxRate || null}::NUMERIC,
${record.metadata ? JSON.stringify(JSON.parse(record.metadata)) : null}::JSONB
)
`;
successful++;
} catch (error) {
if (config.skipDuplicates && error.code === '23505') {
// Skip duplicates
continue;
}
failed++;
console.error(`Failed to post entry ${record.referenceId}:`, error.message);
}
processed++;
}
console.log(`Processed ${processed}/${records.length} entries (${successful} successful, ${failed} failed)`);
}
return { processed, successful, failed };
}
// CLI entry point
const config: BackfillConfig = {
sourceFile: process.env.BACKFILL_SOURCE_FILE || 'backfill.csv',
ledgerId: process.env.LEDGER_ID || 'MASTER',
batchSize: parseInt(process.env.BATCH_SIZE || '1000', 10),
skipDuplicates: process.env.SKIP_DUPLICATES === 'true',
};
backfillFromCSV(config)
.then((result) => {
console.log('Backfill completed:', result);
process.exit(0);
})
.catch((error) => {
console.error('Backfill failed:', error);
process.exit(1);
})
.finally(() => {
prisma.$disconnect();
});
SQL Backfill Script
File: dbis_core/scripts/backfill-ledger.sql
-- Ledger Backfill Script
-- Use this for direct SQL-based backfill from another database
-- Example: Backfill from external ledger_entries_legacy table
DO $$
DECLARE
batch_size INTEGER := 1000;
processed INTEGER := 0;
successful INTEGER := 0;
failed INTEGER := 0;
entry RECORD;
BEGIN
-- Create temporary table for batch processing
CREATE TEMP TABLE backfill_batch AS
SELECT * FROM ledger_entries_legacy
ORDER BY id
LIMIT 0;
-- Process in batches
FOR entry IN
SELECT * FROM ledger_entries_legacy
ORDER BY id
LOOP
BEGIN
-- Post entry using atomic function
PERFORM post_ledger_entry(
entry.ledger_id::TEXT,
entry.debit_account_id::TEXT,
entry.credit_account_id::TEXT,
entry.amount::NUMERIC,
entry.currency_code::TEXT,
entry.asset_type::TEXT,
entry.transaction_type::TEXT,
entry.reference_id::TEXT,
entry.fx_rate::NUMERIC,
entry.metadata::JSONB
);
successful := successful + 1;
EXCEPTION
WHEN unique_violation THEN
-- Duplicate reference ID, skip if configured
failed := failed + 1;
RAISE NOTICE 'Skipping duplicate reference ID: %', entry.reference_id;
WHEN OTHERS THEN
failed := failed + 1;
RAISE NOTICE 'Error processing entry %: %', entry.reference_id, SQLERRM;
END;
processed := processed + 1;
-- Checkpoint every batch_size entries
IF processed % batch_size = 0 THEN
RAISE NOTICE 'Processed % entries (% successful, % failed)', processed, successful, failed;
END IF;
END LOOP;
RAISE NOTICE 'Backfill completed: % total, % successful, % failed', processed, successful, failed;
END $$;
Best Practices
1. Idempotency
- Always use unique
reference_idfor each entry - Use atomic posting function that enforces idempotency
- Skip duplicates during backfill if they already exist
2. Checkpointing
- Save checkpoint after each batch
- Enable resumable backfill from last checkpoint
- Track progress with metrics (processed, successful, failed)
3. Validation
- Validate source data before transformation
- Validate transformed entries before posting
- Verify balances after backfill completion
- Reconcile with source system if possible
4. Error Handling
- Log all errors with full context
- Retry transient errors with exponential backoff
- Skip permanent errors (e.g., duplicate reference IDs)
- Generate error report after completion
5. Performance
- Process in batches (1000-10000 entries per batch)
- Use parallel processing for large backfills
- Monitor database performance during backfill
- Schedule during low-traffic periods
6. Testing
- Test backfill process on staging environment first
- Use small test dataset to verify transformation
- Verify balances match expected values
- Test rollback procedures if needed
Monitoring and Metrics
Key Metrics to Track
-
Progress Metrics:
- Total entries to process
- Entries processed
- Entries successful
- Entries failed
- Processing rate (entries/second)
-
Performance Metrics:
- Batch processing time
- Database query time
- Checkpoint save time
- Total elapsed time
-
Quality Metrics:
- Duplicate entries skipped
- Validation errors
- Balance inconsistencies
- Reconciliation mismatches
Monitoring Queries
-- Check backfill progress
SELECT
source_id,
source_type,
status,
total_processed,
total_successful,
total_failed,
last_checkpoint_at,
NOW() - last_checkpoint_at as time_since_last_checkpoint
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS';
-- Check for stalled backfills
SELECT *
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS'
AND last_checkpoint_at < NOW() - INTERVAL '1 hour';
-- Verify backfill completion
SELECT
COUNT(*) as total_entries,
MIN(timestamp_utc) as earliest,
MAX(timestamp_utc) as latest
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';
Rollback Procedures
Scenario 1: Rollback Before Constraints Applied
If constraints have not been applied, rollback is straightforward:
-- Remove backfilled entries
DELETE FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';
-- Remove outbox records
DELETE FROM dual_ledger_outbox
WHERE reference_id LIKE 'BACKFILL-%';
-- Reset balances (if needed)
UPDATE bank_accounts
SET balance = balance - (
SELECT COALESCE(SUM(amount), 0)
FROM ledger_entries
WHERE debit_account_id = bank_accounts.id
AND reference_id LIKE 'BACKFILL-%'
) + (
SELECT COALESCE(SUM(amount), 0)
FROM ledger_entries
WHERE credit_account_id = bank_accounts.id
AND reference_id LIKE 'BACKFILL-%'
);
Scenario 2: Rollback After Constraints Applied
If constraints have been applied, rollback is more complex:
- Temporarily disable constraints
- Remove backfilled entries
- Recalculate balances
- Re-enable constraints
- Verify balance consistency
Note: This should only be done during maintenance window.
Troubleshooting
Common Issues
1. Duplicate Reference ID Errors
Symptom: unique_violation error on reference_id
Solution:
- Check if entries were already backfilled
- Use
skipDuplicates: trueto skip existing entries - Or regenerate reference IDs for duplicates
2. Balance Inconsistencies
Symptom: Balance validation fails
Solution:
- Identify accounts with inconsistent balances
- Generate correction entries
- Post corrections before applying constraints
3. Slow Performance
Symptom: Backfill processing is slow
Solution:
- Increase batch size (if memory allows)
- Use parallel processing
- Optimize database indexes
- Run during off-peak hours
4. Out of Memory
Symptom: Process runs out of memory
Solution:
- Reduce batch size
- Process sequentially instead of parallel
- Use streaming instead of loading all data
Examples
Example 1: CSV Backfill
# Configure environment
export DATABASE_URL="postgresql://user:password@host:port/database"
export BACKFILL_SOURCE_FILE="ledger_export.csv"
export LEDGER_ID="MASTER"
export BATCH_SIZE="1000"
export SKIP_DUPLICATES="true"
# Run backfill script
cd dbis_core
ts-node scripts/backfill-ledger.ts
Example 2: SCB Ledger Sync
// Backfill from SCB ledger via API
async function backfillFromSCB(sovereignBankId: string, startDate: Date, endDate: Date) {
const scbApi = new SCBLedgerAPI(sovereignBankId);
const entries = await scbApi.getLedgerEntries(startDate, endDate);
for (const entry of entries) {
// Transform SCB entry to DBIS format
const dbisEntry = transformSCBEntry(entry);
// Post to DBIS (will create outbox record for dual-ledger sync)
await ledgerPostingModule.postEntry(dbisEntry);
}
}
References
- Migration Files:
dbis_core/db/migrations/ - Ledger Posting Module:
dbis_core/src/core/ledger/ledger-posting.module.ts - Atomic Posting Function:
dbis_core/db/migrations/005_post_ledger_entry.sql - Block Indexer Backfill:
explorer-monorepo/backend/indexer/backfill/backfill.go(reference pattern)