Files
dbis_core/db/migrations/BACKFILL_STRATEGY.md
2026-03-02 12:14:07 -08:00

27 KiB

Ledger Backfill Strategy

Version: 1.0.0
Last Updated: 2025-01-20
Status: Active Documentation

Overview

This document outlines the strategy for backfilling historical ledger data into the DBIS Core Banking System ledger. The backfill process ensures data integrity, maintains idempotency, and supports resumable operations.


Backfill Scenarios

Scenario 1: Initial System Setup (Empty Ledger)

Use Case: Setting up a new DBIS instance with empty ledger, populating from external source (e.g., legacy system, CSV export, external API).

Approach:

  1. Validate source data integrity
  2. Transform source data to DBIS ledger format
  3. Batch insert with idempotency checks
  4. Verify balance consistency
  5. Apply constraints after backfill

Scenario 2: Schema Migration (Existing Ledger Data)

Use Case: Migrating existing ledger data to new schema (e.g., adding new fields, restructuring).

Approach:

  1. Audit existing data
  2. Transform to new schema format
  3. Migrate in batches
  4. Verify data integrity
  5. Update schema constraints

Scenario 3: Data Reconciliation (Fix Inconsistencies)

Use Case: Fixing inconsistent balances or missing entries discovered during audit.

Approach:

  1. Identify inconsistencies
  2. Generate correction entries
  3. Apply corrections via normal posting function
  4. Verify balance consistency
  5. Document corrections in audit log

Scenario 4: Dual-Ledger Sync (SCB Ledger Backfill)

Use Case: Backfilling historical entries from SCB (Sovereign Central Bank) ledger to DBIS.

Approach:

  1. Extract entries from SCB ledger
  2. Transform to DBIS format
  3. Post to DBIS via outbox pattern
  4. Track sync status
  5. Verify dual-ledger consistency

Backfill Architecture

Component Overview

┌─────────────────────────────────────────────────────────────┐
│                    Backfill Architecture                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐     ┌──────────────┐     ┌─────────────┐ │
│  │   Source    │────▶│   Transform  │────▶│   Validate  │ │
│  │   Reader    │     │    Service   │     │   Service   │ │
│  └─────────────┘     └──────────────┘     └─────────────┘ │
│                                                             │
│                            │                                │
│                            ▼                                │
│                    ┌──────────────┐                         │
│                    │    Batch     │                         │
│                    │   Processor  │                         │
│                    └──────────────┘                         │
│                            │                                │
│                ┌───────────┴───────────┐                    │
│                ▼                       ▼                    │
│        ┌──────────────┐       ┌──────────────┐             │
│        │   Ledger     │       │   Checkpoint │             │
│        │   Posting    │       │    Service   │             │
│        │   Module     │       └──────────────┘             │
│        └──────────────┘                                    │
│                │                                            │
│                ▼                                            │
│        ┌──────────────┐                                    │
│        │   Audit &    │                                    │
│        │ Verification │                                    │
│        └──────────────┘                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Key Components

  1. Source Reader: Reads data from source (CSV, API, database, etc.)
  2. Transform Service: Transforms source data to DBIS ledger format
  3. Validate Service: Validates entries before posting
  4. Batch Processor: Processes entries in batches with checkpointing
  5. Ledger Posting Module: Uses atomic posting function for entries
  6. Checkpoint Service: Tracks progress for resumable backfill
  7. Audit & Verification: Validates backfill results

Backfill Process

Step 1: Pre-Backfill Preparation

1.1 Audit Existing Data

Before starting backfill, audit existing data:

-- Check for existing ledger entries
SELECT COUNT(*), MIN(timestamp_utc), MAX(timestamp_utc)
FROM ledger_entries;

-- Check for inconsistent balances
SELECT id, balance, available_balance, reserved_balance
FROM bank_accounts
WHERE available_balance < 0
   OR reserved_balance < 0
   OR available_balance > balance
   OR (available_balance + reserved_balance) > balance;

-- Check for duplicate reference IDs
SELECT ledger_id, reference_id, COUNT(*)
FROM ledger_entries
GROUP BY ledger_id, reference_id
HAVING COUNT(*) > 1;

1.2 Verify Schema

Ensure all required migrations are applied:

-- Verify idempotency constraint exists
SELECT constraint_name 
FROM information_schema.table_constraints 
WHERE table_name = 'ledger_entries' 
  AND constraint_name LIKE '%reference%';

-- Verify outbox table exists
SELECT COUNT(*) FROM dual_ledger_outbox;

-- Verify posting function exists
SELECT routine_name 
FROM information_schema.routines 
WHERE routine_name = 'post_ledger_entry';

1.3 Prepare Source Data

  • CSV Export: Ensure format matches expected schema
  • API Extraction: Configure API endpoints and authentication
  • Database Extraction: Set up connection and query filters
  • Legacy System: Configure export format and mapping

Step 2: Data Transformation

2.1 Source Data Format

Source data should be transformed to this format:

interface LedgerEntrySource {
  ledgerId: string;           // e.g., "MASTER", "SOVEREIGN"
  debitAccountId: string;     // Account ID
  creditAccountId: string;    // Account ID
  amount: string;             // Decimal as string (e.g., "1000.00")
  currencyCode: string;       // ISO 4217 (e.g., "USD")
  assetType: string;          // "fiat", "cbdc", "commodity", "security"
  transactionType: string;    // Transaction type classification
  referenceId: string;        // Unique reference ID (required for idempotency)
  timestampUtc?: string;      // ISO 8601 timestamp
  fxRate?: string;            // FX rate if applicable
  metadata?: Record<string, unknown>; // Additional metadata
}

2.2 Transformation Rules

  1. Account ID Mapping: Map source account identifiers to DBIS account IDs
  2. Amount Normalization: Convert amounts to standard format (decimal string)
  3. Currency Validation: Validate currency codes against ISO 4217
  4. Timestamp Normalization: Convert timestamps to UTC ISO 8601 format
  5. Reference ID Generation: Generate unique reference IDs if not present
  6. Metadata Extraction: Extract relevant metadata from source

2.3 Example Transformation Script

// Example: Transform CSV data
function transformCSVToLedgerEntry(csvRow: CSVRow): LedgerEntrySource {
  return {
    ledgerId: csvRow.ledger || 'MASTER',
    debitAccountId: mapAccountId(csvRow.debit_account),
    creditAccountId: mapAccountId(csvRow.credit_account),
    amount: normalizeAmount(csvRow.amount),
    currencyCode: csvRow.currency || 'USD',
    assetType: csvRow.asset_type || 'fiat',
    transactionType: mapTransactionType(csvRow.txn_type),
    referenceId: csvRow.reference_id || generateReferenceId(csvRow),
    timestampUtc: csvRow.timestamp || new Date().toISOString(),
    fxRate: csvRow.fx_rate || undefined,
    metadata: extractMetadata(csvRow),
  };
}

Step 3: Batch Processing

3.1 Batch Configuration

Configure batch processing parameters:

interface BackfillConfig {
  batchSize: number;           // Entries per batch (default: 1000)
  checkpointInterval: number;  // Checkpoint every N batches (default: 10)
  maxRetries: number;          // Max retries per batch (default: 3)
  retryDelay: number;          // Initial retry delay in ms (default: 1000)
  parallelWorkers: number;     // Number of parallel workers (default: 1)
  skipDuplicates: boolean;     // Skip entries with duplicate reference IDs (default: true)
  validateBalances: boolean;   // Validate balances after each batch (default: true)
}

3.2 Checkpointing Strategy

Use checkpointing to enable resumable backfill:

-- Create checkpoint table for ledger backfill
CREATE TABLE IF NOT EXISTS ledger_backfill_checkpoints (
  id SERIAL PRIMARY KEY,
  source_id VARCHAR(255) NOT NULL,
  source_type VARCHAR(50) NOT NULL, -- 'CSV', 'API', 'DATABASE', 'SCB'
  last_processed_id VARCHAR(255),
  last_processed_timestamp TIMESTAMP,
  total_processed BIGINT DEFAULT 0,
  total_successful BIGINT DEFAULT 0,
  total_failed BIGINT DEFAULT 0,
  status VARCHAR(50) DEFAULT 'IN_PROGRESS', -- 'IN_PROGRESS', 'COMPLETED', 'FAILED', 'PAUSED'
  started_at TIMESTAMP DEFAULT NOW(),
  last_checkpoint_at TIMESTAMP DEFAULT NOW(),
  completed_at TIMESTAMP,
  error_message TEXT,
  metadata JSONB,
  UNIQUE(source_id, source_type)
);

CREATE INDEX idx_backfill_checkpoints_status 
ON ledger_backfill_checkpoints(status);

CREATE INDEX idx_backfill_checkpoints_source 
ON ledger_backfill_checkpoints(source_id, source_type);

3.3 Batch Processing Loop

async function processBackfill(
  source: DataSource,
  config: BackfillConfig
): Promise<BackfillResult> {
  const checkpoint = await loadCheckpoint(source.id, source.type);
  let processed = 0;
  let successful = 0;
  let failed = 0;
  let lastProcessedId: string | null = null;
  let lastProcessedTimestamp: Date | null = null;

  while (true) {
    // Load batch from source (starting from checkpoint)
    const batch = await source.readBatch({
      startId: checkpoint?.lastProcessedId,
      startTimestamp: checkpoint?.lastProcessedTimestamp,
      limit: config.batchSize,
    });

    if (batch.length === 0) {
      break; // No more data
    }

    // Process batch
    const results = await processBatch(batch, config);

    // Update counters
    processed += batch.length;
    successful += results.successful;
    failed += results.failed;

    // Update checkpoint
    lastProcessedId = batch[batch.length - 1].id;
    lastProcessedTimestamp = batch[batch.length - 1].timestamp;

    await saveCheckpoint({
      sourceId: source.id,
      sourceType: source.type,
      lastProcessedId,
      lastProcessedTimestamp,
      totalProcessed: processed,
      totalSuccessful: successful,
      totalFailed: failed,
      status: 'IN_PROGRESS',
    });

    // Validate balances if configured
    if (config.validateBalances && processed % (config.checkpointInterval * config.batchSize) === 0) {
      await validateBalances();
    }
  }

  // Mark as completed
  await saveCheckpoint({
    sourceId: source.id,
    sourceType: source.type,
    status: 'COMPLETED',
    completedAt: new Date(),
  });

  return {
    totalProcessed: processed,
    totalSuccessful: successful,
    totalFailed: failed,
  };
}

Step 4: Entry Posting

4.1 Use Atomic Posting Function

Always use the atomic posting function for backfill entries:

async function postBackfillEntry(entry: LedgerEntrySource): Promise<void> {
  try {
    // Use atomic posting function via SQL
    const result = await prisma.$executeRaw`
      SELECT post_ledger_entry(
        ${entry.ledgerId}::TEXT,
        ${entry.debitAccountId}::TEXT,
        ${entry.creditAccountId}::TEXT,
        ${entry.amount}::NUMERIC,
        ${entry.currencyCode}::TEXT,
        ${entry.assetType}::TEXT,
        ${entry.transactionType}::TEXT,
        ${entry.referenceId}::TEXT,
        ${entry.fxRate || null}::NUMERIC,
        ${entry.metadata ? JSON.stringify(entry.metadata) : null}::JSONB
      )
    `;

    // Verify result
    if (!result) {
      throw new Error('Failed to post ledger entry');
    }
  } catch (error) {
    // Handle idempotency violation (duplicate reference ID)
    if (error.code === '23505' || error.message?.includes('duplicate')) {
      // Skip duplicate entries if configured
      if (config.skipDuplicates) {
        return; // Entry already exists, skip
      }
      throw new Error(`Duplicate reference ID: ${entry.referenceId}`);
    }

    throw error;
  }
}

4.2 Batch Posting

Post entries in batches for efficiency:

async function processBatch(
  entries: LedgerEntrySource[],
  config: BackfillConfig
): Promise<{ successful: number; failed: number }> {
  let successful = 0;
  let failed = 0;

  // Process in parallel if configured
  if (config.parallelWorkers > 1) {
    const chunks = chunkArray(entries, config.parallelWorkers);
    const results = await Promise.allSettled(
      chunks.map((chunk) => processChunk(chunk, config))
    );

    for (const result of results) {
      if (result.status === 'fulfilled') {
        successful += result.value.successful;
        failed += result.value.failed;
      } else {
        failed += entries.length;
      }
    }
  } else {
    // Sequential processing
    for (const entry of entries) {
      try {
        await postBackfillEntry(entry);
        successful++;
      } catch (error) {
        failed++;
        logError(entry, error);
      }
    }
  }

  return { successful, failed };
}

Step 5: Balance Constraints Application

5.1 Pre-Constraint Validation

Before applying balance constraints, validate all balances:

-- Validate all balances are consistent
DO $$
DECLARE
  inconsistent_count INTEGER;
BEGIN
  SELECT COUNT(*) INTO inconsistent_count
  FROM bank_accounts
  WHERE available_balance < 0
     OR reserved_balance < 0
     OR available_balance > balance
     OR (available_balance + reserved_balance) > balance;

  IF inconsistent_count > 0 THEN
    RAISE EXCEPTION 'Found % inconsistent balances. Fix before applying constraints.', inconsistent_count;
  END IF;
END $$;

5.2 Apply Constraints

After backfill completes and balances are validated, apply constraints:

# Apply balance constraints migration
psql $DATABASE_URL -f db/migrations/004_balance_constraints.sql

5.3 Post-Constraint Verification

Verify constraints are applied correctly:

-- Check constraint exists
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'bank_accounts'
  AND constraint_name LIKE '%balance%';

-- Verify constraint is enforced
-- This should fail if constraints are working
UPDATE bank_accounts
SET available_balance = -1
WHERE id = (SELECT id FROM bank_accounts LIMIT 1);

Step 6: Verification and Reconciliation

6.1 Entry Verification

Verify all entries were posted correctly:

-- Compare source count vs. posted count
SELECT 
  COUNT(*) as total_entries,
  COUNT(DISTINCT reference_id) as unique_references,
  COUNT(DISTINCT ledger_id) as unique_ledgers,
  MIN(timestamp_utc) as earliest_entry,
  MAX(timestamp_utc) as latest_entry
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%'; -- If using prefix for backfill entries

-- Check for missing entries
SELECT source_id, reference_id
FROM backfill_source_data
WHERE NOT EXISTS (
  SELECT 1 FROM ledger_entries
  WHERE reference_id = backfill_source_data.reference_id
);

6.2 Balance Reconciliation

Reconcile balances after backfill:

-- Compare expected vs. actual balances
SELECT 
  account_id,
  expected_balance,
  actual_balance,
  (expected_balance - actual_balance) as difference
FROM (
  SELECT 
    account_id,
    SUM(CASE WHEN side = 'debit' THEN amount ELSE -amount END) as expected_balance,
    (SELECT balance FROM bank_accounts WHERE id = account_id) as actual_balance
  FROM ledger_entries
  WHERE account_id IN (SELECT id FROM bank_accounts)
  GROUP BY account_id
) reconciliation
WHERE ABS(expected_balance - actual_balance) > 0.01; -- Allow small rounding differences

6.3 Dual-Ledger Reconciliation

If backfilling from SCB ledger, reconcile dual-ledger consistency:

-- Check outbox sync status
SELECT 
  status,
  COUNT(*) as count,
  MIN(created_at) as oldest,
  MAX(created_at) as newest
FROM dual_ledger_outbox
WHERE created_at >= (SELECT MIN(timestamp_utc) FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%')
GROUP BY status;

-- Verify all entries have corresponding outbox records (for SCB sync)
SELECT le.id, le.reference_id
FROM ledger_entries le
WHERE le.reference_id LIKE 'BACKFILL-%'
  AND NOT EXISTS (
    SELECT 1 FROM dual_ledger_outbox dlo
    WHERE dlo.reference_id = le.reference_id
  );

Implementation Scripts

TypeScript Backfill Script

File: dbis_core/scripts/backfill-ledger.ts

#!/usr/bin/env ts-node
import { PrismaClient } from '@prisma/client';
import { readFileSync } from 'fs';
import { parse } from 'csv-parse/sync';

const prisma = new PrismaClient();

interface BackfillConfig {
  sourceFile: string;
  ledgerId: string;
  batchSize: number;
  skipDuplicates: boolean;
}

async function backfillFromCSV(config: BackfillConfig) {
  // Read and parse CSV
  const csvData = readFileSync(config.sourceFile, 'utf-8');
  const records = parse(csvData, {
    columns: true,
    skip_empty_lines: true,
  });

  let processed = 0;
  let successful = 0;
  let failed = 0;

  // Process in batches
  for (let i = 0; i < records.length; i += config.batchSize) {
    const batch = records.slice(i, i + config.batchSize);
    
    for (const record of batch) {
      try {
        // Transform and post entry
        await prisma.$executeRaw`
          SELECT post_ledger_entry(
            ${config.ledgerId}::TEXT,
            ${record.debitAccountId}::TEXT,
            ${record.creditAccountId}::TEXT,
            ${record.amount}::NUMERIC,
            ${record.currencyCode}::TEXT,
            ${record.assetType || 'fiat'}::TEXT,
            ${record.transactionType}::TEXT,
            ${record.referenceId}::TEXT,
            ${record.fxRate || null}::NUMERIC,
            ${record.metadata ? JSON.stringify(JSON.parse(record.metadata)) : null}::JSONB
          )
        `;
        successful++;
      } catch (error) {
        if (config.skipDuplicates && error.code === '23505') {
          // Skip duplicates
          continue;
        }
        failed++;
        console.error(`Failed to post entry ${record.referenceId}:`, error.message);
      }
      processed++;
    }

    console.log(`Processed ${processed}/${records.length} entries (${successful} successful, ${failed} failed)`);
  }

  return { processed, successful, failed };
}

// CLI entry point
const config: BackfillConfig = {
  sourceFile: process.env.BACKFILL_SOURCE_FILE || 'backfill.csv',
  ledgerId: process.env.LEDGER_ID || 'MASTER',
  batchSize: parseInt(process.env.BATCH_SIZE || '1000', 10),
  skipDuplicates: process.env.SKIP_DUPLICATES === 'true',
};

backfillFromCSV(config)
  .then((result) => {
    console.log('Backfill completed:', result);
    process.exit(0);
  })
  .catch((error) => {
    console.error('Backfill failed:', error);
    process.exit(1);
  })
  .finally(() => {
    prisma.$disconnect();
  });

SQL Backfill Script

File: dbis_core/scripts/backfill-ledger.sql

-- Ledger Backfill Script
-- Use this for direct SQL-based backfill from another database

-- Example: Backfill from external ledger_entries_legacy table
DO $$
DECLARE
  batch_size INTEGER := 1000;
  processed INTEGER := 0;
  successful INTEGER := 0;
  failed INTEGER := 0;
  entry RECORD;
BEGIN
  -- Create temporary table for batch processing
  CREATE TEMP TABLE backfill_batch AS
  SELECT * FROM ledger_entries_legacy
  ORDER BY id
  LIMIT 0;

  -- Process in batches
  FOR entry IN 
    SELECT * FROM ledger_entries_legacy
    ORDER BY id
  LOOP
    BEGIN
      -- Post entry using atomic function
      PERFORM post_ledger_entry(
        entry.ledger_id::TEXT,
        entry.debit_account_id::TEXT,
        entry.credit_account_id::TEXT,
        entry.amount::NUMERIC,
        entry.currency_code::TEXT,
        entry.asset_type::TEXT,
        entry.transaction_type::TEXT,
        entry.reference_id::TEXT,
        entry.fx_rate::NUMERIC,
        entry.metadata::JSONB
      );
      
      successful := successful + 1;
    EXCEPTION
      WHEN unique_violation THEN
        -- Duplicate reference ID, skip if configured
        failed := failed + 1;
        RAISE NOTICE 'Skipping duplicate reference ID: %', entry.reference_id;
      WHEN OTHERS THEN
        failed := failed + 1;
        RAISE NOTICE 'Error processing entry %: %', entry.reference_id, SQLERRM;
    END;

    processed := processed + 1;

    -- Checkpoint every batch_size entries
    IF processed % batch_size = 0 THEN
      RAISE NOTICE 'Processed % entries (% successful, % failed)', processed, successful, failed;
    END IF;
  END LOOP;

  RAISE NOTICE 'Backfill completed: % total, % successful, % failed', processed, successful, failed;
END $$;

Best Practices

1. Idempotency

  • Always use unique reference_id for each entry
  • Use atomic posting function that enforces idempotency
  • Skip duplicates during backfill if they already exist

2. Checkpointing

  • Save checkpoint after each batch
  • Enable resumable backfill from last checkpoint
  • Track progress with metrics (processed, successful, failed)

3. Validation

  • Validate source data before transformation
  • Validate transformed entries before posting
  • Verify balances after backfill completion
  • Reconcile with source system if possible

4. Error Handling

  • Log all errors with full context
  • Retry transient errors with exponential backoff
  • Skip permanent errors (e.g., duplicate reference IDs)
  • Generate error report after completion

5. Performance

  • Process in batches (1000-10000 entries per batch)
  • Use parallel processing for large backfills
  • Monitor database performance during backfill
  • Schedule during low-traffic periods

6. Testing

  • Test backfill process on staging environment first
  • Use small test dataset to verify transformation
  • Verify balances match expected values
  • Test rollback procedures if needed

Monitoring and Metrics

Key Metrics to Track

  1. Progress Metrics:

    • Total entries to process
    • Entries processed
    • Entries successful
    • Entries failed
    • Processing rate (entries/second)
  2. Performance Metrics:

    • Batch processing time
    • Database query time
    • Checkpoint save time
    • Total elapsed time
  3. Quality Metrics:

    • Duplicate entries skipped
    • Validation errors
    • Balance inconsistencies
    • Reconciliation mismatches

Monitoring Queries

-- Check backfill progress
SELECT 
  source_id,
  source_type,
  status,
  total_processed,
  total_successful,
  total_failed,
  last_checkpoint_at,
  NOW() - last_checkpoint_at as time_since_last_checkpoint
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS';

-- Check for stalled backfills
SELECT *
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS'
  AND last_checkpoint_at < NOW() - INTERVAL '1 hour';

-- Verify backfill completion
SELECT 
  COUNT(*) as total_entries,
  MIN(timestamp_utc) as earliest,
  MAX(timestamp_utc) as latest
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';

Rollback Procedures

Scenario 1: Rollback Before Constraints Applied

If constraints have not been applied, rollback is straightforward:

-- Remove backfilled entries
DELETE FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';

-- Remove outbox records
DELETE FROM dual_ledger_outbox
WHERE reference_id LIKE 'BACKFILL-%';

-- Reset balances (if needed)
UPDATE bank_accounts
SET balance = balance - (
  SELECT COALESCE(SUM(amount), 0)
  FROM ledger_entries
  WHERE debit_account_id = bank_accounts.id
    AND reference_id LIKE 'BACKFILL-%'
) + (
  SELECT COALESCE(SUM(amount), 0)
  FROM ledger_entries
  WHERE credit_account_id = bank_accounts.id
    AND reference_id LIKE 'BACKFILL-%'
);

Scenario 2: Rollback After Constraints Applied

If constraints have been applied, rollback is more complex:

  1. Temporarily disable constraints
  2. Remove backfilled entries
  3. Recalculate balances
  4. Re-enable constraints
  5. Verify balance consistency

Note: This should only be done during maintenance window.


Troubleshooting

Common Issues

1. Duplicate Reference ID Errors

Symptom: unique_violation error on reference_id

Solution:

  • Check if entries were already backfilled
  • Use skipDuplicates: true to skip existing entries
  • Or regenerate reference IDs for duplicates

2. Balance Inconsistencies

Symptom: Balance validation fails

Solution:

  • Identify accounts with inconsistent balances
  • Generate correction entries
  • Post corrections before applying constraints

3. Slow Performance

Symptom: Backfill processing is slow

Solution:

  • Increase batch size (if memory allows)
  • Use parallel processing
  • Optimize database indexes
  • Run during off-peak hours

4. Out of Memory

Symptom: Process runs out of memory

Solution:

  • Reduce batch size
  • Process sequentially instead of parallel
  • Use streaming instead of loading all data

Examples

Example 1: CSV Backfill

# Configure environment
export DATABASE_URL="postgresql://user:password@host:port/database"
export BACKFILL_SOURCE_FILE="ledger_export.csv"
export LEDGER_ID="MASTER"
export BATCH_SIZE="1000"
export SKIP_DUPLICATES="true"

# Run backfill script
cd dbis_core
ts-node scripts/backfill-ledger.ts

Example 2: SCB Ledger Sync

// Backfill from SCB ledger via API
async function backfillFromSCB(sovereignBankId: string, startDate: Date, endDate: Date) {
  const scbApi = new SCBLedgerAPI(sovereignBankId);
  const entries = await scbApi.getLedgerEntries(startDate, endDate);

  for (const entry of entries) {
    // Transform SCB entry to DBIS format
    const dbisEntry = transformSCBEntry(entry);

    // Post to DBIS (will create outbox record for dual-ledger sync)
    await ledgerPostingModule.postEntry(dbisEntry);
  }
}

References

  • Migration Files: dbis_core/db/migrations/
  • Ledger Posting Module: dbis_core/src/core/ledger/ledger-posting.module.ts
  • Atomic Posting Function: dbis_core/db/migrations/005_post_ledger_entry.sql
  • Block Indexer Backfill: explorer-monorepo/backend/indexer/backfill/backfill.go (reference pattern)