# Ledger Backfill Strategy **Version**: 1.0.0 **Last Updated**: 2025-01-20 **Status**: Active Documentation ## Overview This document outlines the strategy for backfilling historical ledger data into the DBIS Core Banking System ledger. The backfill process ensures data integrity, maintains idempotency, and supports resumable operations. --- ## Backfill Scenarios ### Scenario 1: Initial System Setup (Empty Ledger) **Use Case**: Setting up a new DBIS instance with empty ledger, populating from external source (e.g., legacy system, CSV export, external API). **Approach**: 1. Validate source data integrity 2. Transform source data to DBIS ledger format 3. Batch insert with idempotency checks 4. Verify balance consistency 5. Apply constraints after backfill ### Scenario 2: Schema Migration (Existing Ledger Data) **Use Case**: Migrating existing ledger data to new schema (e.g., adding new fields, restructuring). **Approach**: 1. Audit existing data 2. Transform to new schema format 3. Migrate in batches 4. Verify data integrity 5. Update schema constraints ### Scenario 3: Data Reconciliation (Fix Inconsistencies) **Use Case**: Fixing inconsistent balances or missing entries discovered during audit. **Approach**: 1. Identify inconsistencies 2. Generate correction entries 3. Apply corrections via normal posting function 4. Verify balance consistency 5. Document corrections in audit log ### Scenario 4: Dual-Ledger Sync (SCB Ledger Backfill) **Use Case**: Backfilling historical entries from SCB (Sovereign Central Bank) ledger to DBIS. **Approach**: 1. Extract entries from SCB ledger 2. Transform to DBIS format 3. Post to DBIS via outbox pattern 4. Track sync status 5. Verify dual-ledger consistency --- ## Backfill Architecture ### Component Overview ``` ┌─────────────────────────────────────────────────────────────┐ │ Backfill Architecture │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ │ │ Source │────▶│ Transform │────▶│ Validate │ │ │ │ Reader │ │ Service │ │ Service │ │ │ └─────────────┘ └──────────────┘ └─────────────┘ │ │ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ Batch │ │ │ │ Processor │ │ │ └──────────────┘ │ │ │ │ │ ┌───────────┴───────────┐ │ │ ▼ ▼ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ Ledger │ │ Checkpoint │ │ │ │ Posting │ │ Service │ │ │ │ Module │ └──────────────┘ │ │ └──────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ Audit & │ │ │ │ Verification │ │ │ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ ``` ### Key Components 1. **Source Reader**: Reads data from source (CSV, API, database, etc.) 2. **Transform Service**: Transforms source data to DBIS ledger format 3. **Validate Service**: Validates entries before posting 4. **Batch Processor**: Processes entries in batches with checkpointing 5. **Ledger Posting Module**: Uses atomic posting function for entries 6. **Checkpoint Service**: Tracks progress for resumable backfill 7. **Audit & Verification**: Validates backfill results --- ## Backfill Process ### Step 1: Pre-Backfill Preparation #### 1.1 Audit Existing Data Before starting backfill, audit existing data: ```sql -- Check for existing ledger entries SELECT COUNT(*), MIN(timestamp_utc), MAX(timestamp_utc) FROM ledger_entries; -- Check for inconsistent balances SELECT id, balance, available_balance, reserved_balance FROM bank_accounts WHERE available_balance < 0 OR reserved_balance < 0 OR available_balance > balance OR (available_balance + reserved_balance) > balance; -- Check for duplicate reference IDs SELECT ledger_id, reference_id, COUNT(*) FROM ledger_entries GROUP BY ledger_id, reference_id HAVING COUNT(*) > 1; ``` #### 1.2 Verify Schema Ensure all required migrations are applied: ```sql -- Verify idempotency constraint exists SELECT constraint_name FROM information_schema.table_constraints WHERE table_name = 'ledger_entries' AND constraint_name LIKE '%reference%'; -- Verify outbox table exists SELECT COUNT(*) FROM dual_ledger_outbox; -- Verify posting function exists SELECT routine_name FROM information_schema.routines WHERE routine_name = 'post_ledger_entry'; ``` #### 1.3 Prepare Source Data - **CSV Export**: Ensure format matches expected schema - **API Extraction**: Configure API endpoints and authentication - **Database Extraction**: Set up connection and query filters - **Legacy System**: Configure export format and mapping --- ### Step 2: Data Transformation #### 2.1 Source Data Format Source data should be transformed to this format: ```typescript interface LedgerEntrySource { ledgerId: string; // e.g., "MASTER", "SOVEREIGN" debitAccountId: string; // Account ID creditAccountId: string; // Account ID amount: string; // Decimal as string (e.g., "1000.00") currencyCode: string; // ISO 4217 (e.g., "USD") assetType: string; // "fiat", "cbdc", "commodity", "security" transactionType: string; // Transaction type classification referenceId: string; // Unique reference ID (required for idempotency) timestampUtc?: string; // ISO 8601 timestamp fxRate?: string; // FX rate if applicable metadata?: Record; // Additional metadata } ``` #### 2.2 Transformation Rules 1. **Account ID Mapping**: Map source account identifiers to DBIS account IDs 2. **Amount Normalization**: Convert amounts to standard format (decimal string) 3. **Currency Validation**: Validate currency codes against ISO 4217 4. **Timestamp Normalization**: Convert timestamps to UTC ISO 8601 format 5. **Reference ID Generation**: Generate unique reference IDs if not present 6. **Metadata Extraction**: Extract relevant metadata from source #### 2.3 Example Transformation Script ```typescript // Example: Transform CSV data function transformCSVToLedgerEntry(csvRow: CSVRow): LedgerEntrySource { return { ledgerId: csvRow.ledger || 'MASTER', debitAccountId: mapAccountId(csvRow.debit_account), creditAccountId: mapAccountId(csvRow.credit_account), amount: normalizeAmount(csvRow.amount), currencyCode: csvRow.currency || 'USD', assetType: csvRow.asset_type || 'fiat', transactionType: mapTransactionType(csvRow.txn_type), referenceId: csvRow.reference_id || generateReferenceId(csvRow), timestampUtc: csvRow.timestamp || new Date().toISOString(), fxRate: csvRow.fx_rate || undefined, metadata: extractMetadata(csvRow), }; } ``` --- ### Step 3: Batch Processing #### 3.1 Batch Configuration Configure batch processing parameters: ```typescript interface BackfillConfig { batchSize: number; // Entries per batch (default: 1000) checkpointInterval: number; // Checkpoint every N batches (default: 10) maxRetries: number; // Max retries per batch (default: 3) retryDelay: number; // Initial retry delay in ms (default: 1000) parallelWorkers: number; // Number of parallel workers (default: 1) skipDuplicates: boolean; // Skip entries with duplicate reference IDs (default: true) validateBalances: boolean; // Validate balances after each batch (default: true) } ``` #### 3.2 Checkpointing Strategy Use checkpointing to enable resumable backfill: ```sql -- Create checkpoint table for ledger backfill CREATE TABLE IF NOT EXISTS ledger_backfill_checkpoints ( id SERIAL PRIMARY KEY, source_id VARCHAR(255) NOT NULL, source_type VARCHAR(50) NOT NULL, -- 'CSV', 'API', 'DATABASE', 'SCB' last_processed_id VARCHAR(255), last_processed_timestamp TIMESTAMP, total_processed BIGINT DEFAULT 0, total_successful BIGINT DEFAULT 0, total_failed BIGINT DEFAULT 0, status VARCHAR(50) DEFAULT 'IN_PROGRESS', -- 'IN_PROGRESS', 'COMPLETED', 'FAILED', 'PAUSED' started_at TIMESTAMP DEFAULT NOW(), last_checkpoint_at TIMESTAMP DEFAULT NOW(), completed_at TIMESTAMP, error_message TEXT, metadata JSONB, UNIQUE(source_id, source_type) ); CREATE INDEX idx_backfill_checkpoints_status ON ledger_backfill_checkpoints(status); CREATE INDEX idx_backfill_checkpoints_source ON ledger_backfill_checkpoints(source_id, source_type); ``` #### 3.3 Batch Processing Loop ```typescript async function processBackfill( source: DataSource, config: BackfillConfig ): Promise { const checkpoint = await loadCheckpoint(source.id, source.type); let processed = 0; let successful = 0; let failed = 0; let lastProcessedId: string | null = null; let lastProcessedTimestamp: Date | null = null; while (true) { // Load batch from source (starting from checkpoint) const batch = await source.readBatch({ startId: checkpoint?.lastProcessedId, startTimestamp: checkpoint?.lastProcessedTimestamp, limit: config.batchSize, }); if (batch.length === 0) { break; // No more data } // Process batch const results = await processBatch(batch, config); // Update counters processed += batch.length; successful += results.successful; failed += results.failed; // Update checkpoint lastProcessedId = batch[batch.length - 1].id; lastProcessedTimestamp = batch[batch.length - 1].timestamp; await saveCheckpoint({ sourceId: source.id, sourceType: source.type, lastProcessedId, lastProcessedTimestamp, totalProcessed: processed, totalSuccessful: successful, totalFailed: failed, status: 'IN_PROGRESS', }); // Validate balances if configured if (config.validateBalances && processed % (config.checkpointInterval * config.batchSize) === 0) { await validateBalances(); } } // Mark as completed await saveCheckpoint({ sourceId: source.id, sourceType: source.type, status: 'COMPLETED', completedAt: new Date(), }); return { totalProcessed: processed, totalSuccessful: successful, totalFailed: failed, }; } ``` --- ### Step 4: Entry Posting #### 4.1 Use Atomic Posting Function Always use the atomic posting function for backfill entries: ```typescript async function postBackfillEntry(entry: LedgerEntrySource): Promise { try { // Use atomic posting function via SQL const result = await prisma.$executeRaw` SELECT post_ledger_entry( ${entry.ledgerId}::TEXT, ${entry.debitAccountId}::TEXT, ${entry.creditAccountId}::TEXT, ${entry.amount}::NUMERIC, ${entry.currencyCode}::TEXT, ${entry.assetType}::TEXT, ${entry.transactionType}::TEXT, ${entry.referenceId}::TEXT, ${entry.fxRate || null}::NUMERIC, ${entry.metadata ? JSON.stringify(entry.metadata) : null}::JSONB ) `; // Verify result if (!result) { throw new Error('Failed to post ledger entry'); } } catch (error) { // Handle idempotency violation (duplicate reference ID) if (error.code === '23505' || error.message?.includes('duplicate')) { // Skip duplicate entries if configured if (config.skipDuplicates) { return; // Entry already exists, skip } throw new Error(`Duplicate reference ID: ${entry.referenceId}`); } throw error; } } ``` #### 4.2 Batch Posting Post entries in batches for efficiency: ```typescript async function processBatch( entries: LedgerEntrySource[], config: BackfillConfig ): Promise<{ successful: number; failed: number }> { let successful = 0; let failed = 0; // Process in parallel if configured if (config.parallelWorkers > 1) { const chunks = chunkArray(entries, config.parallelWorkers); const results = await Promise.allSettled( chunks.map((chunk) => processChunk(chunk, config)) ); for (const result of results) { if (result.status === 'fulfilled') { successful += result.value.successful; failed += result.value.failed; } else { failed += entries.length; } } } else { // Sequential processing for (const entry of entries) { try { await postBackfillEntry(entry); successful++; } catch (error) { failed++; logError(entry, error); } } } return { successful, failed }; } ``` --- ### Step 5: Balance Constraints Application #### 5.1 Pre-Constraint Validation Before applying balance constraints, validate all balances: ```sql -- Validate all balances are consistent DO $$ DECLARE inconsistent_count INTEGER; BEGIN SELECT COUNT(*) INTO inconsistent_count FROM bank_accounts WHERE available_balance < 0 OR reserved_balance < 0 OR available_balance > balance OR (available_balance + reserved_balance) > balance; IF inconsistent_count > 0 THEN RAISE EXCEPTION 'Found % inconsistent balances. Fix before applying constraints.', inconsistent_count; END IF; END $$; ``` #### 5.2 Apply Constraints After backfill completes and balances are validated, apply constraints: ```bash # Apply balance constraints migration psql $DATABASE_URL -f db/migrations/004_balance_constraints.sql ``` #### 5.3 Post-Constraint Verification Verify constraints are applied correctly: ```sql -- Check constraint exists SELECT constraint_name, constraint_type FROM information_schema.table_constraints WHERE table_name = 'bank_accounts' AND constraint_name LIKE '%balance%'; -- Verify constraint is enforced -- This should fail if constraints are working UPDATE bank_accounts SET available_balance = -1 WHERE id = (SELECT id FROM bank_accounts LIMIT 1); ``` --- ### Step 6: Verification and Reconciliation #### 6.1 Entry Verification Verify all entries were posted correctly: ```sql -- Compare source count vs. posted count SELECT COUNT(*) as total_entries, COUNT(DISTINCT reference_id) as unique_references, COUNT(DISTINCT ledger_id) as unique_ledgers, MIN(timestamp_utc) as earliest_entry, MAX(timestamp_utc) as latest_entry FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%'; -- If using prefix for backfill entries -- Check for missing entries SELECT source_id, reference_id FROM backfill_source_data WHERE NOT EXISTS ( SELECT 1 FROM ledger_entries WHERE reference_id = backfill_source_data.reference_id ); ``` #### 6.2 Balance Reconciliation Reconcile balances after backfill: ```sql -- Compare expected vs. actual balances SELECT account_id, expected_balance, actual_balance, (expected_balance - actual_balance) as difference FROM ( SELECT account_id, SUM(CASE WHEN side = 'debit' THEN amount ELSE -amount END) as expected_balance, (SELECT balance FROM bank_accounts WHERE id = account_id) as actual_balance FROM ledger_entries WHERE account_id IN (SELECT id FROM bank_accounts) GROUP BY account_id ) reconciliation WHERE ABS(expected_balance - actual_balance) > 0.01; -- Allow small rounding differences ``` #### 6.3 Dual-Ledger Reconciliation If backfilling from SCB ledger, reconcile dual-ledger consistency: ```sql -- Check outbox sync status SELECT status, COUNT(*) as count, MIN(created_at) as oldest, MAX(created_at) as newest FROM dual_ledger_outbox WHERE created_at >= (SELECT MIN(timestamp_utc) FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%') GROUP BY status; -- Verify all entries have corresponding outbox records (for SCB sync) SELECT le.id, le.reference_id FROM ledger_entries le WHERE le.reference_id LIKE 'BACKFILL-%' AND NOT EXISTS ( SELECT 1 FROM dual_ledger_outbox dlo WHERE dlo.reference_id = le.reference_id ); ``` --- ## Implementation Scripts ### TypeScript Backfill Script **File**: `dbis_core/scripts/backfill-ledger.ts` ```typescript #!/usr/bin/env ts-node import { PrismaClient } from '@prisma/client'; import { readFileSync } from 'fs'; import { parse } from 'csv-parse/sync'; const prisma = new PrismaClient(); interface BackfillConfig { sourceFile: string; ledgerId: string; batchSize: number; skipDuplicates: boolean; } async function backfillFromCSV(config: BackfillConfig) { // Read and parse CSV const csvData = readFileSync(config.sourceFile, 'utf-8'); const records = parse(csvData, { columns: true, skip_empty_lines: true, }); let processed = 0; let successful = 0; let failed = 0; // Process in batches for (let i = 0; i < records.length; i += config.batchSize) { const batch = records.slice(i, i + config.batchSize); for (const record of batch) { try { // Transform and post entry await prisma.$executeRaw` SELECT post_ledger_entry( ${config.ledgerId}::TEXT, ${record.debitAccountId}::TEXT, ${record.creditAccountId}::TEXT, ${record.amount}::NUMERIC, ${record.currencyCode}::TEXT, ${record.assetType || 'fiat'}::TEXT, ${record.transactionType}::TEXT, ${record.referenceId}::TEXT, ${record.fxRate || null}::NUMERIC, ${record.metadata ? JSON.stringify(JSON.parse(record.metadata)) : null}::JSONB ) `; successful++; } catch (error) { if (config.skipDuplicates && error.code === '23505') { // Skip duplicates continue; } failed++; console.error(`Failed to post entry ${record.referenceId}:`, error.message); } processed++; } console.log(`Processed ${processed}/${records.length} entries (${successful} successful, ${failed} failed)`); } return { processed, successful, failed }; } // CLI entry point const config: BackfillConfig = { sourceFile: process.env.BACKFILL_SOURCE_FILE || 'backfill.csv', ledgerId: process.env.LEDGER_ID || 'MASTER', batchSize: parseInt(process.env.BATCH_SIZE || '1000', 10), skipDuplicates: process.env.SKIP_DUPLICATES === 'true', }; backfillFromCSV(config) .then((result) => { console.log('Backfill completed:', result); process.exit(0); }) .catch((error) => { console.error('Backfill failed:', error); process.exit(1); }) .finally(() => { prisma.$disconnect(); }); ``` ### SQL Backfill Script **File**: `dbis_core/scripts/backfill-ledger.sql` ```sql -- Ledger Backfill Script -- Use this for direct SQL-based backfill from another database -- Example: Backfill from external ledger_entries_legacy table DO $$ DECLARE batch_size INTEGER := 1000; processed INTEGER := 0; successful INTEGER := 0; failed INTEGER := 0; entry RECORD; BEGIN -- Create temporary table for batch processing CREATE TEMP TABLE backfill_batch AS SELECT * FROM ledger_entries_legacy ORDER BY id LIMIT 0; -- Process in batches FOR entry IN SELECT * FROM ledger_entries_legacy ORDER BY id LOOP BEGIN -- Post entry using atomic function PERFORM post_ledger_entry( entry.ledger_id::TEXT, entry.debit_account_id::TEXT, entry.credit_account_id::TEXT, entry.amount::NUMERIC, entry.currency_code::TEXT, entry.asset_type::TEXT, entry.transaction_type::TEXT, entry.reference_id::TEXT, entry.fx_rate::NUMERIC, entry.metadata::JSONB ); successful := successful + 1; EXCEPTION WHEN unique_violation THEN -- Duplicate reference ID, skip if configured failed := failed + 1; RAISE NOTICE 'Skipping duplicate reference ID: %', entry.reference_id; WHEN OTHERS THEN failed := failed + 1; RAISE NOTICE 'Error processing entry %: %', entry.reference_id, SQLERRM; END; processed := processed + 1; -- Checkpoint every batch_size entries IF processed % batch_size = 0 THEN RAISE NOTICE 'Processed % entries (% successful, % failed)', processed, successful, failed; END IF; END LOOP; RAISE NOTICE 'Backfill completed: % total, % successful, % failed', processed, successful, failed; END $$; ``` --- ## Best Practices ### 1. Idempotency - Always use unique `reference_id` for each entry - Use atomic posting function that enforces idempotency - Skip duplicates during backfill if they already exist ### 2. Checkpointing - Save checkpoint after each batch - Enable resumable backfill from last checkpoint - Track progress with metrics (processed, successful, failed) ### 3. Validation - Validate source data before transformation - Validate transformed entries before posting - Verify balances after backfill completion - Reconcile with source system if possible ### 4. Error Handling - Log all errors with full context - Retry transient errors with exponential backoff - Skip permanent errors (e.g., duplicate reference IDs) - Generate error report after completion ### 5. Performance - Process in batches (1000-10000 entries per batch) - Use parallel processing for large backfills - Monitor database performance during backfill - Schedule during low-traffic periods ### 6. Testing - Test backfill process on staging environment first - Use small test dataset to verify transformation - Verify balances match expected values - Test rollback procedures if needed --- ## Monitoring and Metrics ### Key Metrics to Track 1. **Progress Metrics**: - Total entries to process - Entries processed - Entries successful - Entries failed - Processing rate (entries/second) 2. **Performance Metrics**: - Batch processing time - Database query time - Checkpoint save time - Total elapsed time 3. **Quality Metrics**: - Duplicate entries skipped - Validation errors - Balance inconsistencies - Reconciliation mismatches ### Monitoring Queries ```sql -- Check backfill progress SELECT source_id, source_type, status, total_processed, total_successful, total_failed, last_checkpoint_at, NOW() - last_checkpoint_at as time_since_last_checkpoint FROM ledger_backfill_checkpoints WHERE status = 'IN_PROGRESS'; -- Check for stalled backfills SELECT * FROM ledger_backfill_checkpoints WHERE status = 'IN_PROGRESS' AND last_checkpoint_at < NOW() - INTERVAL '1 hour'; -- Verify backfill completion SELECT COUNT(*) as total_entries, MIN(timestamp_utc) as earliest, MAX(timestamp_utc) as latest FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%'; ``` --- ## Rollback Procedures ### Scenario 1: Rollback Before Constraints Applied If constraints have not been applied, rollback is straightforward: ```sql -- Remove backfilled entries DELETE FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%'; -- Remove outbox records DELETE FROM dual_ledger_outbox WHERE reference_id LIKE 'BACKFILL-%'; -- Reset balances (if needed) UPDATE bank_accounts SET balance = balance - ( SELECT COALESCE(SUM(amount), 0) FROM ledger_entries WHERE debit_account_id = bank_accounts.id AND reference_id LIKE 'BACKFILL-%' ) + ( SELECT COALESCE(SUM(amount), 0) FROM ledger_entries WHERE credit_account_id = bank_accounts.id AND reference_id LIKE 'BACKFILL-%' ); ``` ### Scenario 2: Rollback After Constraints Applied If constraints have been applied, rollback is more complex: 1. Temporarily disable constraints 2. Remove backfilled entries 3. Recalculate balances 4. Re-enable constraints 5. Verify balance consistency **Note**: This should only be done during maintenance window. --- ## Troubleshooting ### Common Issues #### 1. Duplicate Reference ID Errors **Symptom**: `unique_violation` error on `reference_id` **Solution**: - Check if entries were already backfilled - Use `skipDuplicates: true` to skip existing entries - Or regenerate reference IDs for duplicates #### 2. Balance Inconsistencies **Symptom**: Balance validation fails **Solution**: - Identify accounts with inconsistent balances - Generate correction entries - Post corrections before applying constraints #### 3. Slow Performance **Symptom**: Backfill processing is slow **Solution**: - Increase batch size (if memory allows) - Use parallel processing - Optimize database indexes - Run during off-peak hours #### 4. Out of Memory **Symptom**: Process runs out of memory **Solution**: - Reduce batch size - Process sequentially instead of parallel - Use streaming instead of loading all data --- ## Examples ### Example 1: CSV Backfill ```bash # Configure environment export DATABASE_URL="postgresql://user:password@host:port/database" export BACKFILL_SOURCE_FILE="ledger_export.csv" export LEDGER_ID="MASTER" export BATCH_SIZE="1000" export SKIP_DUPLICATES="true" # Run backfill script cd dbis_core ts-node scripts/backfill-ledger.ts ``` ### Example 2: SCB Ledger Sync ```typescript // Backfill from SCB ledger via API async function backfillFromSCB(sovereignBankId: string, startDate: Date, endDate: Date) { const scbApi = new SCBLedgerAPI(sovereignBankId); const entries = await scbApi.getLedgerEntries(startDate, endDate); for (const entry of entries) { // Transform SCB entry to DBIS format const dbisEntry = transformSCBEntry(entry); // Post to DBIS (will create outbox record for dual-ledger sync) await ledgerPostingModule.postEntry(dbisEntry); } } ``` --- ## References - Migration Files: `dbis_core/db/migrations/` - Ledger Posting Module: `dbis_core/src/core/ledger/ledger-posting.module.ts` - Atomic Posting Function: `dbis_core/db/migrations/005_post_ledger_entry.sql` - Block Indexer Backfill: `explorer-monorepo/backend/indexer/backfill/backfill.go` (reference pattern)