API: Phoenix railing proxy, API key auth for /api/v1/*, schema export, docs, migrations, tests

- Phoenix API Railing: proxy to PHOENIX_RAILING_URL, tenant me routes
- Tenant-auth: X-API-Key support for /api/v1/* (api_keys table)
- Migration 026: api_keys table; 025 sovereign stack marketplace
- GET /graphql/schema, GET /graphql-playground, api/docs OpenAPI
- Integration tests: phoenix-railing.test.ts
- docs/api/API_VERSIONING: /api/v1/ railing alignment
- docs/phoenix/PORTAL_RAILING_WIRING

Made-with: Cursor
This commit is contained in:
defiQUG
2026-03-11 12:57:41 -07:00
parent 33b02b636b
commit 8436e22f4c
45 changed files with 4308 additions and 17 deletions

View File

@@ -0,0 +1,40 @@
/**
* Integration tests for Phoenix API Railing routes:
* /api/v1/tenants/me/resources, /api/v1/tenants/me/health (tenant-scoped).
*/
import { describe, it, expect, beforeAll, afterAll } from 'vitest'
import Fastify from 'fastify'
import { registerPhoenixRailingRoutes } from '../../routes/phoenix-railing.js'
describe('Phoenix Railing routes', () => {
let fastify: Awaited<ReturnType<typeof Fastify>>
beforeAll(async () => {
fastify = Fastify({ logger: false })
fastify.decorateRequest('tenantContext', null)
await registerPhoenixRailingRoutes(fastify)
})
afterAll(async () => {
await fastify.close()
})
describe('GET /api/v1/tenants/me/resources', () => {
it('returns 401 when no tenant context', async () => {
const res = await fastify.inject({ method: 'GET', url: '/api/v1/tenants/me/resources' })
expect(res.statusCode).toBe(401)
const body = JSON.parse(res.payload)
expect(body.error).toMatch(/tenant|required/i)
})
})
describe('GET /api/v1/tenants/me/health', () => {
it('returns 401 when no tenant context', async () => {
const res = await fastify.inject({ method: 'GET', url: '/api/v1/tenants/me/health' })
expect(res.statusCode).toBe(401)
const body = JSON.parse(res.payload)
expect(body.error).toBeDefined()
})
})
})

View File

@@ -61,15 +61,24 @@ export const up: Migration['up'] = async (db) => {
await db.query(`CREATE INDEX IF NOT EXISTS idx_resources_status ON resources(status)`)
await db.query(`CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)`)
// Triggers
// Triggers (use DROP IF EXISTS to avoid conflicts)
await db.query(`
DROP TRIGGER IF EXISTS update_users_updated_at ON users
`)
await db.query(`
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`)
await db.query(`
DROP TRIGGER IF EXISTS update_sites_updated_at ON sites
`)
await db.query(`
CREATE TRIGGER update_sites_updated_at BEFORE UPDATE ON sites
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`)
await db.query(`
DROP TRIGGER IF EXISTS update_resources_updated_at ON resources
`)
await db.query(`
CREATE TRIGGER update_resources_updated_at BEFORE UPDATE ON resources
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()

View File

@@ -58,15 +58,18 @@ export const up: Migration['up'] = async (db) => {
await db.query(`
INSERT INTO industry_controls (industry, pillar, control_code, name, description, compliance_frameworks, requirements)
VALUES
('FINANCIAL', 'SECURITY', 'PCI-DSS-1', 'PCI-DSS Compliance', 'Payment card industry data security', ARRAY['PCI-DSS'], ARRAY['Encrypt cardholder data', 'Restrict access']),
('FINANCIAL', 'SECURITY', 'SOX-1', 'SOX Financial Controls', 'Sarbanes-Oxley financial reporting controls', ARRAY['SOX'], ARRAY['Financial audit trail', 'Access controls']),
('FINANCIAL', 'RELIABILITY', 'FIN-REL-1', 'Financial System Availability', 'High availability for financial systems', ARRAY[], ARRAY['99.99% uptime', 'Disaster recovery']),
('TELECOMMUNICATIONS', 'SECURITY', 'CALEA-1', 'CALEA Compliance', 'Lawful intercept capabilities', ARRAY['CALEA'], ARRAY['Intercept capability', 'Audit logging']),
('TELECOMMUNICATIONS', 'RELIABILITY', 'TEL-REL-1', 'Network Availability', 'Telecom network reliability', ARRAY[], ARRAY['99.999% uptime', 'Redundancy'])
('FINANCIAL', 'SECURITY', 'PCI-DSS-1', 'PCI-DSS Compliance', 'Payment card industry data security', ARRAY['PCI-DSS']::TEXT[], ARRAY['Encrypt cardholder data', 'Restrict access']::TEXT[]),
('FINANCIAL', 'SECURITY', 'SOX-1', 'SOX Financial Controls', 'Sarbanes-Oxley financial reporting controls', ARRAY['SOX']::TEXT[], ARRAY['Financial audit trail', 'Access controls']::TEXT[]),
('FINANCIAL', 'RELIABILITY', 'FIN-REL-1', 'Financial System Availability', 'High availability for financial systems', ARRAY[]::TEXT[], ARRAY['99.99% uptime', 'Disaster recovery']::TEXT[]),
('TELECOMMUNICATIONS', 'SECURITY', 'CALEA-1', 'CALEA Compliance', 'Lawful intercept capabilities', ARRAY['CALEA']::TEXT[], ARRAY['Intercept capability', 'Audit logging']::TEXT[]),
('TELECOMMUNICATIONS', 'RELIABILITY', 'TEL-REL-1', 'Network Availability', 'Telecom network reliability', ARRAY[]::TEXT[], ARRAY['99.999% uptime', 'Redundancy']::TEXT[])
ON CONFLICT (industry, pillar, control_code) DO NOTHING
`)
// Update triggers
await db.query(`
DROP TRIGGER IF EXISTS update_industry_controls_updated_at ON industry_controls
`)
await db.query(`
CREATE TRIGGER update_industry_controls_updated_at
BEFORE UPDATE ON industry_controls
@@ -74,6 +77,9 @@ export const up: Migration['up'] = async (db) => {
EXECUTE FUNCTION update_updated_at_column()
`)
await db.query(`
DROP TRIGGER IF EXISTS update_waf_assessments_updated_at ON waf_assessments
`)
await db.query(`
CREATE TRIGGER update_waf_assessments_updated_at
BEFORE UPDATE ON waf_assessments

View File

@@ -0,0 +1,88 @@
import { Migration } from '../migrate.js'
export const up: Migration['up'] = async (db) => {
// Add new product categories for Sovereign Stack services
// We need to drop and recreate the constraint to add new categories
await db.query(`
ALTER TABLE products
DROP CONSTRAINT IF EXISTS products_category_check
`)
await db.query(`
ALTER TABLE products
ADD CONSTRAINT products_category_check
CHECK (category IN (
'COMPUTE',
'NETWORK_INFRA',
'BLOCKCHAIN_STACK',
'BLOCKCHAIN_TOOLS',
'FINANCIAL_MESSAGING',
'INTERNET_REGISTRY',
'AI_LLM_AGENT',
'LEDGER_SERVICES',
'IDENTITY_SERVICES',
'WALLET_SERVICES',
'ORCHESTRATION_SERVICES',
'PLATFORM_SERVICES'
))
`)
// Create Phoenix Cloud Services publisher if it doesn't exist
await db.query(`
INSERT INTO publishers (
name,
display_name,
description,
website_url,
logo_url,
verified,
metadata,
created_at,
updated_at
)
VALUES (
'phoenix-cloud-services',
'Phoenix Cloud Services',
'Sovereign cloud infrastructure provider powering the Sankofa ecosystem. Phoenix delivers world-class cloud services with multi-tenancy, sovereign identity, and advanced billing capabilities.',
'https://phoenix.sankofa.nexus',
'https://cdn.sankofa.nexus/phoenix-logo.svg',
true,
'{"provider": "phoenix", "tier": "sovereign", "regions": 325, "sovereign_identity": true}'::jsonb,
NOW(),
NOW()
)
ON CONFLICT (name) DO UPDATE SET
display_name = EXCLUDED.display_name,
description = EXCLUDED.description,
website_url = EXCLUDED.website_url,
logo_url = EXCLUDED.logo_url,
verified = true,
metadata = EXCLUDED.metadata,
updated_at = NOW()
`)
}
export const down: Migration['down'] = async (db) => {
// Remove new categories, reverting to original set
await db.query(`
ALTER TABLE products
DROP CONSTRAINT IF EXISTS products_category_check
`)
await db.query(`
ALTER TABLE products
ADD CONSTRAINT products_category_check
CHECK (category IN (
'COMPUTE',
'NETWORK_INFRA',
'BLOCKCHAIN_STACK',
'BLOCKCHAIN_TOOLS',
'FINANCIAL_MESSAGING',
'INTERNET_REGISTRY',
'AI_LLM_AGENT'
))
`)
// Note: We don't delete the Phoenix publisher in down migration
// as it may have been created manually or have dependencies
}

View File

@@ -0,0 +1,45 @@
import { Migration } from '../migrate.js'
/**
* API keys table for client/partner API access (key hash, tenant_id, scopes).
* Used by X-API-Key auth for /api/v1/* and Phoenix API Railing.
*/
export const up: Migration['up'] = async (db) => {
await db.query(`
CREATE TABLE IF NOT EXISTS api_keys (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
key_prefix VARCHAR(20) NOT NULL,
key_hash VARCHAR(255) NOT NULL UNIQUE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
tenant_id UUID REFERENCES tenants(id) ON DELETE SET NULL,
permissions JSONB DEFAULT '["read", "write"]'::jsonb,
last_used_at TIMESTAMP WITH TIME ZONE,
expires_at TIMESTAMP WITH TIME ZONE,
revoked BOOLEAN NOT NULL DEFAULT false,
revoked_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
`)
await db.query(`CREATE INDEX IF NOT EXISTS idx_api_keys_user_id ON api_keys(user_id)`)
await db.query(`CREATE INDEX IF NOT EXISTS idx_api_keys_tenant_id ON api_keys(tenant_id)`)
await db.query(`CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys(key_hash)`)
await db.query(`CREATE INDEX IF NOT EXISTS idx_api_keys_revoked ON api_keys(revoked) WHERE revoked = false`)
await db.query(`
DROP TRIGGER IF EXISTS update_api_keys_updated_at ON api_keys
`)
await db.query(`
CREATE TRIGGER update_api_keys_updated_at BEFORE UPDATE ON api_keys
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
`)
}
export const down: Migration['down'] = async (db) => {
await db.query(`DROP TRIGGER IF EXISTS update_api_keys_updated_at ON api_keys`)
await db.query(`DROP INDEX IF EXISTS idx_api_keys_revoked`)
await db.query(`DROP INDEX IF EXISTS idx_api_keys_key_hash`)
await db.query(`DROP INDEX IF EXISTS idx_api_keys_tenant_id`)
await db.query(`DROP INDEX IF EXISTS idx_api_keys_user_id`)
await db.query(`DROP TABLE IF EXISTS api_keys`)
}

View File

@@ -17,4 +17,13 @@ export { up as up013, down as down013 } from './013_mfa_and_rbac.js'
export { up as up014, down as down014 } from './014_audit_logging.js'
export { up as up015, down as down015 } from './015_incident_response_and_classification.js'
export { up as up016, down as down016 } from './016_resource_sharing.js'
export { up as up017, down as down017 } from './017_marketplace_catalog.js'
export { up as up018, down as down018 } from './018_templates.js'
export { up as up019, down as down019 } from './019_deployments.js'
export { up as up020, down as down020 } from './020_blockchain_networks.js'
export { up as up021, down as down021 } from './021_workflows.js'
export { up as up022, down as down022 } from './022_pop_mappings_and_federation.js'
export { up as up023, down as down023 } from './023_industry_controls_and_waf.js'
export { up as up024, down as down024 } from './024_compliance_audit.js'
export { up as up025, down as down025 } from './025_sovereign_stack_marketplace.js'
export { up as up026, down as down026 } from './026_api_keys.js'

View File

@@ -1,5 +1,6 @@
import 'dotenv/config'
import { getDb } from './index.js'
import { logger } from '../lib/logger.js'
import bcrypt from 'bcryptjs'
async function seed() {

View File

@@ -0,0 +1,625 @@
import 'dotenv/config'
import { getDb } from '../index.js'
import { logger } from '../../lib/logger.js'
interface ServiceDefinition {
name: string
slug: string
category: string
description: string
shortDescription: string
tags: string[]
featured: boolean
iconUrl?: string
documentationUrl?: string
supportUrl?: string
metadata: Record<string, any>
pricingType: string
pricingConfig: {
basePrice?: number
currency?: string
billingPeriod?: string
usageRates?: Record<string, any>
freeTier?: {
requestsPerMonth?: number
features?: string[]
}
}
}
const services: ServiceDefinition[] = [
{
name: 'Phoenix Ledger Service',
slug: 'phoenix-ledger-service',
category: 'LEDGER_SERVICES',
description: `A sovereign-grade double-entry ledger system with virtual accounts, holds, and multi-asset support.
Replaces reliance on external platforms (e.g., Tatum Virtual Accounts) with owned core primitives.
Features include journal entries, subaccounts, holds/reserves, reconciliation, and full audit trail.
Every transaction is a balanced journal entry with idempotency via correlation_id.
Supports multi-asset operations (fiat, stablecoins, tokens) with state machine-based settlement.`,
shortDescription: 'Double-entry ledger with virtual accounts, holds, and multi-asset support',
tags: ['ledger', 'double-entry', 'virtual-accounts', 'financial', 'sovereign', 'audit'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/ledger.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/ledger',
supportUrl: 'https://support.sankofa.nexus/ledger',
metadata: {
apiEndpoints: [
'POST /ledger/entries',
'POST /ledger/holds',
'POST /ledger/transfers',
'GET /ledger/balances'
],
features: [
'Double-entry accounting',
'Virtual account abstraction',
'Holds and reserves',
'Multi-asset support',
'Reconciliation engine',
'Immutable audit trail',
'Idempotent operations',
'State machine settlement'
],
compliance: ['SOC 2', 'PCI DSS', 'GDPR'],
providerAdapters: [],
sla: {
uptime: '99.9%',
latency: '<100ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/ledger-service.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
journalEntry: 0.001,
holdOperation: 0.0005,
transfer: 0.002
},
freeTier: {
requestsPerMonth: 10000,
features: ['Basic ledger operations', 'Up to 100 virtual accounts']
}
}
},
{
name: 'Phoenix Identity Service',
slug: 'phoenix-identity-service',
category: 'IDENTITY_SERVICES',
description: `Comprehensive identity, authentication, and authorization service with support for users, organizations,
roles, and permissions. Features device binding, passkeys support, OAuth/OpenID Connect for integrations,
session management, and risk scoring. Centralizes identity management with no provider dependencies.
Supports multi-tenant identity with fine-grained RBAC and sovereign identity principles.`,
shortDescription: 'Users, orgs, roles, permissions, device binding, passkeys, OAuth/OIDC',
tags: ['identity', 'auth', 'rbac', 'oauth', 'oidc', 'passkeys', 'sovereign'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/identity.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/identity',
supportUrl: 'https://support.sankofa.nexus/identity',
metadata: {
apiEndpoints: [
'POST /identity/users',
'POST /identity/orgs',
'GET /identity/sessions',
'POST /identity/auth/token'
],
features: [
'Multi-tenant identity',
'RBAC with fine-grained permissions',
'Device binding',
'Passkeys support',
'OAuth 2.0 / OIDC',
'Session management',
'Risk scoring',
'SCIM support'
],
compliance: ['SOC 2', 'GDPR', 'HIPAA'],
providerAdapters: [],
sla: {
uptime: '99.95%',
latency: '<50ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/identity-service.md'
},
pricingType: 'SUBSCRIPTION',
pricingConfig: {
basePrice: 99,
currency: 'USD',
billingPeriod: 'MONTHLY',
usageRates: {
perUser: 2.50,
perOrg: 50.00
}
}
},
{
name: 'Phoenix Wallet Registry',
slug: 'phoenix-wallet-registry',
category: 'WALLET_SERVICES',
description: `Wallet mapping and signing policy service with chain support matrix and policy engine.
Manages wallet mapping (user/org ↔ wallet addresses), chain support, policy engine for signing limits and approvals,
and recovery policies. Supports MPC (preferred for production custody), HSM-backed keys for service wallets,
and passkeys + account abstraction for end-users. Features transaction simulation and ERC-4337 smart accounts.`,
shortDescription: 'Wallet mapping, chain support, policy engine, recovery, MPC, HSM',
tags: ['wallet', 'blockchain', 'mpc', 'hsm', 'erc4337', 'custody', 'signing'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/wallet.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/wallet',
supportUrl: 'https://support.sankofa.nexus/wallet',
metadata: {
apiEndpoints: [
'POST /wallets/register',
'POST /wallets/tx/build',
'POST /wallets/tx/simulate',
'POST /wallets/tx/submit'
],
features: [
'Wallet mapping and registry',
'Multi-chain support',
'MPC custody',
'HSM-backed keys',
'Transaction simulation',
'ERC-4337 smart accounts',
'Policy engine',
'Recovery policies'
],
compliance: ['SOC 2', 'ISO 27001'],
providerAdapters: ['Thirdweb (optional)'],
sla: {
uptime: '99.9%',
latency: '<200ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/wallet-registry.md'
},
pricingType: 'HYBRID',
pricingConfig: {
basePrice: 199,
currency: 'USD',
billingPeriod: 'MONTHLY',
usageRates: {
perWallet: 5.00,
perTransaction: 0.01
}
}
},
{
name: 'Phoenix Transaction Orchestrator',
slug: 'phoenix-tx-orchestrator',
category: 'ORCHESTRATION_SERVICES',
description: `On-chain and off-chain workflow orchestration service with retries, compensations,
provider routing, and fallback. Implements state machines for workflow management, enforces idempotency
and exactly-once semantics (logical), and provides provider routing with automatic failover.
Supports both on-chain blockchain transactions and off-chain operations with unified orchestration.`,
shortDescription: 'On-chain/off-chain workflow orchestration with retries and compensations',
tags: ['orchestration', 'workflow', 'blockchain', 'state-machine', 'idempotency', 'transactions'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/tx-orchestrator.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/tx-orchestrator',
supportUrl: 'https://support.sankofa.nexus/tx-orchestrator',
metadata: {
apiEndpoints: [
'POST /orchestrator/workflows',
'GET /orchestrator/workflows/{id}',
'POST /orchestrator/workflows/{id}/retry'
],
features: [
'Workflow state machines',
'Retries and compensations',
'Provider routing and fallback',
'Idempotency enforcement',
'Exactly-once semantics',
'On-chain and off-chain support',
'Correlation ID tracking'
],
compliance: ['SOC 2'],
providerAdapters: ['Alchemy', 'Infura', 'Self-hosted nodes'],
sla: {
uptime: '99.9%',
latency: '<500ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/tx-orchestrator.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
perTransaction: 0.05,
perWorkflow: 0.10
},
freeTier: {
requestsPerMonth: 1000,
features: ['Basic orchestration', 'Up to 10 concurrent workflows']
}
}
},
{
name: 'Phoenix Messaging Orchestrator',
slug: 'phoenix-messaging-orchestrator',
category: 'ORCHESTRATION_SERVICES',
description: `Multi-provider messaging orchestration service with failover for SMS, voice, email, and push notifications.
Features provider selection rules based on cost, deliverability, region, and user preference.
Includes delivery receipts, retries, suppression lists, and compliance features.
Replaces reliance on Twilio with owned core primitives while retaining optional provider integrations via adapters.`,
shortDescription: 'Multi-provider messaging (SMS/voice/email/push) with failover',
tags: ['messaging', 'sms', 'email', 'push', 'notifications', 'orchestration', 'failover'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/messaging.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/messaging',
supportUrl: 'https://support.sankofa.nexus/messaging',
metadata: {
apiEndpoints: [
'POST /messages/send',
'GET /messages/status/{id}',
'GET /messages/delivery/{id}'
],
features: [
'Multi-provider routing',
'Automatic failover',
'Delivery receipts',
'Retry logic',
'Suppression lists',
'Template management',
'Compliance features',
'Cost optimization'
],
compliance: ['SOC 2', 'GDPR', 'TCPA'],
providerAdapters: ['Twilio', 'AWS SNS', 'Vonage', 'MessageBird'],
sla: {
uptime: '99.9%',
latency: '<200ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/messaging-orchestrator.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
perSMS: 0.01,
perEmail: 0.001,
perPush: 0.0005,
perVoice: 0.02
},
freeTier: {
requestsPerMonth: 1000,
features: ['Basic messaging', 'Single provider']
}
}
},
{
name: 'Phoenix Voice Orchestrator',
slug: 'phoenix-voice-orchestrator',
category: 'ORCHESTRATION_SERVICES',
description: `Text-to-speech and speech-to-text orchestration service with audio caching,
multi-provider routing, and moderation. Features deterministic caching (hash-based) for cost and latency optimization,
PII scrubbing, multi-model routing for high quality vs low-latency scenarios, and OSS fallback path for baseline TTS.
Replaces reliance on ElevenLabs with owned core primitives while retaining optional provider integrations.`,
shortDescription: 'TTS/STT with caching, multi-provider routing, moderation',
tags: ['voice', 'tts', 'stt', 'audio', 'media', 'orchestration', 'ai'],
featured: true,
iconUrl: 'https://cdn.sankofa.nexus/services/voice.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/voice',
supportUrl: 'https://support.sankofa.nexus/voice',
metadata: {
apiEndpoints: [
'POST /voice/synthesize',
'GET /voice/audio/{hash}',
'POST /voice/transcribe'
],
features: [
'Audio caching',
'Multi-provider routing',
'PII scrubbing',
'Moderation',
'Multi-model support',
'OSS fallback',
'CDN delivery'
],
compliance: ['SOC 2', 'GDPR'],
providerAdapters: ['ElevenLabs', 'OpenAI', 'Azure TTS', 'OSS TTS'],
sla: {
uptime: '99.9%',
latency: '<500ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/voice-orchestrator.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
perSynthesis: 0.02,
perMinute: 0.10,
perTranscription: 0.05
},
freeTier: {
requestsPerMonth: 100,
features: ['Basic TTS/STT', 'Standard quality']
}
}
},
{
name: 'Phoenix Event Bus',
slug: 'phoenix-event-bus',
category: 'PLATFORM_SERVICES',
description: `Durable event bus service with replay, versioning, and consumer idempotency.
Implements DB Outbox pattern for atomic state + event writes. Supports Kafka, Redpanda, and NATS backends.
Features event versioning, consumer offset tracking, and processed correlation ID tracking for exactly-once delivery.`,
shortDescription: 'Durable events, replay, versioning, consumer idempotency',
tags: ['events', 'messaging', 'kafka', 'outbox', 'event-sourcing', 'platform'],
featured: false,
iconUrl: 'https://cdn.sankofa.nexus/services/event-bus.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/event-bus',
supportUrl: 'https://support.sankofa.nexus/event-bus',
metadata: {
apiEndpoints: [
'POST /events/publish',
'GET /events/consume',
'POST /events/replay'
],
features: [
'DB Outbox pattern',
'Event versioning',
'Consumer idempotency',
'Replay support',
'Multiple backends (Kafka/Redpanda/NATS)',
'Offset tracking',
'Correlation ID support'
],
compliance: ['SOC 2'],
providerAdapters: ['Kafka', 'Redpanda', 'NATS'],
sla: {
uptime: '99.95%',
latency: '<100ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/event-bus.md'
},
pricingType: 'SUBSCRIPTION',
pricingConfig: {
basePrice: 149,
currency: 'USD',
billingPeriod: 'MONTHLY',
usageRates: {
perGBStorage: 0.10,
perMillionEvents: 5.00
}
}
},
{
name: 'Phoenix Audit Service',
slug: 'phoenix-audit-service',
category: 'PLATFORM_SERVICES',
description: `Immutable audit logging service with WORM (Write Once Read Many) archive for compliance.
Features immutable audit logs with who-did-what-when tracking, PII boundaries and retention policies,
and separate operational DB from analytics store. Uses CDC to stream into warehouse for compliance reporting.`,
shortDescription: 'Immutable audit logs, WORM archive, PII boundaries, compliance',
tags: ['audit', 'logging', 'compliance', 'worm', 'immutable', 'platform'],
featured: false,
iconUrl: 'https://cdn.sankofa.nexus/services/audit.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/audit',
supportUrl: 'https://support.sankofa.nexus/audit',
metadata: {
apiEndpoints: [
'POST /audit/log',
'GET /audit/query',
'GET /audit/export'
],
features: [
'Immutable logs',
'WORM archive',
'PII boundaries',
'Retention policies',
'Compliance reporting',
'Access trails',
'CDC to warehouse'
],
compliance: ['SOC 2', 'GDPR', 'HIPAA', 'PCI DSS'],
providerAdapters: [],
sla: {
uptime: '99.9%',
latency: '<50ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/audit-service.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
perGBStorage: 0.15,
perMillionLogs: 10.00
},
freeTier: {
requestsPerMonth: 100000,
features: ['Basic audit logging', '30-day retention']
}
}
},
{
name: 'Phoenix Observability Stack',
slug: 'phoenix-observability',
category: 'PLATFORM_SERVICES',
description: `Comprehensive observability service with distributed tracing, structured logs with correlation IDs,
and SLO monitoring. Features OpenTelemetry integration, distributed tracing across services,
SLOs for ledger posting, message delivery, and transaction settlement. Provides structured logging
with correlation IDs for end-to-end request tracking.`,
shortDescription: 'Distributed tracing, structured logs, SLOs, correlation IDs',
tags: ['observability', 'monitoring', 'tracing', 'logging', 'slo', 'opentelemetry', 'platform'],
featured: false,
iconUrl: 'https://cdn.sankofa.nexus/services/observability.svg',
documentationUrl: 'https://docs.sankofa.nexus/services/observability',
supportUrl: 'https://support.sankofa.nexus/observability',
metadata: {
apiEndpoints: [
'GET /observability/traces',
'GET /observability/metrics',
'GET /observability/logs',
'GET /observability/slos'
],
features: [
'Distributed tracing',
'OpenTelemetry integration',
'Structured logging',
'Correlation IDs',
'SLO monitoring',
'Metrics collection',
'Alerting'
],
compliance: ['SOC 2'],
providerAdapters: [],
sla: {
uptime: '99.9%',
latency: '<100ms p95'
},
architecture: 'docs/marketplace/sovereign-stack/observability.md'
},
pricingType: 'USAGE_BASED',
pricingConfig: {
currency: 'USD',
usageRates: {
perMetric: 0.0001,
perLog: 0.00005,
perTrace: 0.001
},
freeTier: {
requestsPerMonth: 1000000,
features: ['Basic observability', '7-day retention']
}
}
}
]
async function seedSovereignStackServices() {
const db = getDb()
try {
logger.info('Seeding Sovereign Stack services...')
// Get or create Phoenix publisher
const publisherResult = await db.query(
`SELECT id FROM publishers WHERE name = 'phoenix-cloud-services'`
)
if (publisherResult.rows.length === 0) {
throw new Error('Phoenix publisher not found. Please run migration 025 first.')
}
const publisherId = publisherResult.rows[0].id
logger.info(`✓ Found Phoenix publisher: ${publisherId}`)
// Seed each service
for (const service of services) {
// Create product
const productResult = await db.query(
`INSERT INTO products (
name, slug, category, description, short_description, publisher_id,
status, featured, icon_url, documentation_url, support_url, metadata, tags
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (slug) DO UPDATE SET
name = EXCLUDED.name,
description = EXCLUDED.description,
short_description = EXCLUDED.description,
category = EXCLUDED.category,
featured = EXCLUDED.featured,
icon_url = EXCLUDED.icon_url,
documentation_url = EXCLUDED.documentation_url,
support_url = EXCLUDED.support_url,
metadata = EXCLUDED.metadata,
tags = EXCLUDED.tags,
updated_at = NOW()
RETURNING id`,
[
service.name,
service.slug,
service.category,
service.description,
service.shortDescription,
publisherId,
'PUBLISHED',
service.featured,
service.iconUrl || null,
service.documentationUrl || null,
service.supportUrl || null,
JSON.stringify(service.metadata),
service.tags
]
)
const productId = productResult.rows[0].id
logger.info(`✓ Created/updated product: ${service.name} (${productId})`)
// Create product version (v1.0.0)
const versionResult = await db.query(
`INSERT INTO product_versions (
product_id, version, status, is_latest, released_at, metadata
) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (product_id, version) DO UPDATE SET
status = EXCLUDED.status,
is_latest = EXCLUDED.is_latest,
released_at = EXCLUDED.released_at,
updated_at = NOW()
RETURNING id`,
[
productId,
'1.0.0',
'PUBLISHED',
true,
new Date(),
JSON.stringify({ initialRelease: true })
]
)
const versionId = versionResult.rows[0].id
// Unmark other versions as latest
await db.query(
`UPDATE product_versions
SET is_latest = FALSE
WHERE product_id = $1 AND id != $2`,
[productId, versionId]
)
// Create pricing model
await db.query(
`INSERT INTO pricing_models (
product_id, product_version_id, pricing_type, base_price, currency,
billing_period, usage_rates, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING`,
[
productId,
versionId,
service.pricingType,
service.pricingConfig.basePrice || null,
service.pricingConfig.currency || 'USD',
service.pricingConfig.billingPeriod || null,
JSON.stringify(service.pricingConfig.usageRates || {}),
JSON.stringify({
freeTier: service.pricingConfig.freeTier || null,
...service.pricingConfig
})
]
)
logger.info(`✓ Created pricing model for: ${service.name}`)
}
logger.info(`✓ Successfully seeded ${services.length} Sovereign Stack services!`)
} catch (error) {
logger.error('Seeding error', { error })
throw error
} finally {
await db.end()
}
}
// Run if called directly (check if this is the main module)
const isMainModule = import.meta.url === `file://${process.argv[1]}` ||
process.argv[1]?.includes('sovereign_stack_services') ||
process.argv[1]?.endsWith('sovereign_stack_services.ts')
if (isMainModule) {
seedSovereignStackServices().catch((error) => {
logger.error('Failed to seed Sovereign Stack services', { error })
process.exit(1)
})
}
export { seedSovereignStackServices, services }

View File

@@ -213,15 +213,51 @@ export function requireJWTSecret(): string {
/**
* Validates database password specifically
* Relaxed requirements for development mode
*/
export function requireDatabasePassword(): string {
return requireProductionSecret(
process.env.DB_PASSWORD,
'DB_PASSWORD',
{
minLength: 32,
const isProduction = process.env.NODE_ENV === 'production' ||
process.env.ENVIRONMENT === 'production' ||
process.env.PRODUCTION === 'true'
if (isProduction) {
return requireProductionSecret(
process.env.DB_PASSWORD,
'DB_PASSWORD',
{
minLength: 32,
}
)
} else {
// Development mode: relaxed requirements
// Still validate but allow shorter passwords for local development
const password = process.env.DB_PASSWORD
if (!password) {
throw new SecretValidationError(
'DB_PASSWORD is required but not provided. Please set it in your .env file.',
'MISSING_SECRET',
{ minLength: 8, requireUppercase: false, requireLowercase: false, requireNumbers: false, requireSpecialChars: false }
)
}
)
// Basic validation for dev (just check it's not empty and not insecure)
if (password.length < 8) {
throw new SecretValidationError(
'DB_PASSWORD must be at least 8 characters long for development',
'INSUFFICIENT_LENGTH',
{ minLength: 8 }
)
}
if (INSECURE_SECRETS.includes(password.toLowerCase().trim())) {
throw new SecretValidationError(
'DB_PASSWORD uses an insecure default value',
'INSECURE_DEFAULT'
)
}
return password
}
}
/**

View File

@@ -26,13 +26,44 @@ declare module 'fastify' {
}
/**
* Extract tenant context from request
* Resolve tenant context from X-API-Key (for /api/v1/* client and partner API access).
* Uses api_keys table: key hash, tenant_id, permissions.
*/
async function extractTenantContextFromApiKey(
request: FastifyRequest
): Promise<TenantContext | null> {
const apiKey = (request.headers['x-api-key'] as string) || (request.headers['X-API-Key'] as string)
if (!apiKey?.trim()) return null
const { verifyApiKey } = await import('../services/api-key.js')
const result = await verifyApiKey(apiKey.trim())
if (!result) return null
return {
tenantId: result.tenantId ?? undefined,
userId: result.userId,
email: '',
role: 'API_KEY',
permissions: { scopes: result.permissions },
isSystemAdmin: false,
}
}
/**
* Extract tenant context from request (JWT or X-API-Key for /api/v1/*)
*/
export async function extractTenantContext(
request: FastifyRequest
): Promise<TenantContext | null> {
// Get token from Authorization header
const authHeader = request.headers.authorization
const isRailingPath = typeof request.url === 'string' && request.url.startsWith('/api/v1')
// For /api/v1/*, allow X-API-Key when no Bearer token
if (isRailingPath && (!authHeader || !authHeader.startsWith('Bearer '))) {
const apiKeyContext = await extractTenantContextFromApiKey(request)
if (apiKeyContext) return apiKeyContext
return null
}
// JWT path
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return null
}

View File

@@ -0,0 +1,98 @@
/**
* Phoenix API Railing — REST routes for Infra/VE/Health proxy and tenant-scoped Client API.
* When PHOENIX_RAILING_URL is set, /api/v1/infra/*, /api/v1/ve/*, /api/v1/health/* proxy to it.
* /api/v1/tenants/me/* are tenant-scoped (from tenantContext).
*/
import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify'
const RAILING_URL = (process.env.PHOENIX_RAILING_URL || '').replace(/\/$/, '')
const RAILING_API_KEY = process.env.PHOENIX_RAILING_API_KEY || ''
async function proxyToRailing(
request: FastifyRequest<{ Params: Record<string, string>; Querystring: Record<string, string> }>,
reply: FastifyReply,
path: string
) {
if (!RAILING_URL) {
return reply.status(503).send({
error: 'Phoenix railing not configured',
message: 'Set PHOENIX_RAILING_URL to the Phoenix Deploy API or Phoenix API base URL',
})
}
const qs = new URLSearchParams(request.query as Record<string, string>).toString()
const url = `${RAILING_URL}${path}${qs ? `?${qs}` : ''}`
const headers: Record<string, string> = {
'Content-Type': 'application/json',
...(request.headers['content-type'] && { 'Content-Type': request.headers['content-type'] }),
}
if (RAILING_API_KEY) headers['Authorization'] = `Bearer ${RAILING_API_KEY}`
else if (request.headers.authorization) headers['Authorization'] = request.headers.authorization
try {
const res = await fetch(url, {
method: request.method,
headers,
body: request.method !== 'GET' && request.body ? JSON.stringify(request.body) : undefined,
})
const data = await res.json().catch(() => ({}))
return reply.status(res.status).send(data)
} catch (err: any) {
return reply.status(502).send({ error: err?.message || 'Railing proxy failed' })
}
}
export async function registerPhoenixRailingRoutes(fastify: FastifyInstance) {
if (RAILING_URL) {
fastify.get('/api/v1/infra/nodes', async (request, reply) => proxyToRailing(request, reply, '/api/v1/infra/nodes'))
fastify.get('/api/v1/infra/storage', async (request, reply) => proxyToRailing(request, reply, '/api/v1/infra/storage'))
fastify.get('/api/v1/ve/vms', async (request, reply) => proxyToRailing(request, reply, '/api/v1/ve/vms'))
fastify.get('/api/v1/ve/vms/:node/:vmid/status', async (request, reply) => {
const { node, vmid } = (request as any).params
return proxyToRailing(request, reply, `/api/v1/ve/vms/${node}/${vmid}/status`)
})
fastify.get('/api/v1/health/metrics', async (request, reply) => proxyToRailing(request, reply, '/api/v1/health/metrics'))
fastify.get('/api/v1/health/alerts', async (request, reply) => proxyToRailing(request, reply, '/api/v1/health/alerts'))
fastify.get('/api/v1/health/summary', async (request, reply) => proxyToRailing(request, reply, '/api/v1/health/summary'))
fastify.post('/api/v1/ve/vms/:node/:vmid/start', async (request, reply) => {
const { node, vmid } = (request as any).params
return proxyToRailing(request, reply, `/api/v1/ve/vms/${node}/${vmid}/start`)
})
fastify.post('/api/v1/ve/vms/:node/:vmid/stop', async (request, reply) => {
const { node, vmid } = (request as any).params
return proxyToRailing(request, reply, `/api/v1/ve/vms/${node}/${vmid}/stop`)
})
fastify.post('/api/v1/ve/vms/:node/:vmid/reboot', async (request, reply) => {
const { node, vmid } = (request as any).params
return proxyToRailing(request, reply, `/api/v1/ve/vms/${node}/${vmid}/reboot`)
})
}
fastify.get('/api/v1/tenants/me/resources', async (request, reply) => {
const tenantContext = (request as any).tenantContext
if (!tenantContext?.tenantId) {
return reply.status(401).send({ error: 'Tenant context required', message: 'Use API key or JWT with tenant scope' })
}
const db = (await import('../db/index.js')).getDb()
const result = await db.query(
'SELECT id, name, resource_type, provider, provider_id, site_id, metadata, created_at FROM resource_inventory WHERE tenant_id = $1 ORDER BY created_at DESC',
[tenantContext.tenantId]
)
return reply.send({ resources: result.rows, tenantId: tenantContext.tenantId })
})
fastify.get('/api/v1/tenants/me/health', async (request, reply) => {
const tenantContext = (request as any).tenantContext
if (!tenantContext?.tenantId) {
return reply.status(401).send({ error: 'Tenant context required' })
}
if (RAILING_URL) {
return proxyToRailing(request, reply, '/api/v1/health/summary')
}
return reply.send({
tenantId: tenantContext.tenantId,
status: 'unknown',
updated_at: new Date().toISOString(),
message: 'Set PHOENIX_RAILING_URL for full health summary',
})
})
}

View File

@@ -1282,6 +1282,11 @@ export const typeDefs = gql`
FINANCIAL_MESSAGING
INTERNET_REGISTRY
AI_LLM_AGENT
LEDGER_SERVICES
IDENTITY_SERVICES
WALLET_SERVICES
ORCHESTRATION_SERVICES
PLATFORM_SERVICES
}
enum ProductStatus {

View File

@@ -17,6 +17,8 @@ import { logger } from './lib/logger'
import { validateAllSecrets } from './lib/secret-validation'
import { initializeFIPS } from './lib/crypto'
import { getFastifyTLSOptions } from './lib/tls-config'
import { registerPhoenixRailingRoutes } from './routes/phoenix-railing.js'
import { printSchema } from 'graphql'
// Get TLS configuration (empty if certificates not available)
const tlsOptions = getFastifyTLSOptions()
@@ -111,6 +113,39 @@ async function startServer() {
return { status: 'ok', timestamp: new Date().toISOString() }
})
// GraphQL schema export (SDL) for docs and codegen
fastify.get('/graphql/schema', async (_request, reply) => {
reply.type('text/plain').send(printSchema(schema))
})
// GraphQL Playground (interactive docs) — redirect to Apollo Sandbox or show schema link
fastify.get('/graphql-playground', async (_request, reply) => {
const base = process.env.PUBLIC_URL || 'http://localhost:4000'
reply.type('text/html').send(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>Phoenix API — GraphQL</title>
<style>
body { font-family: system-ui; padding: 2rem; max-width: 48rem; margin: 0 auto; }
a { color: #0d9488; }
code { background: #f1f5f9; padding: 0.2em 0.4em; border-radius: 4px; }
</style>
</head>
<body>
<h1>Phoenix API — GraphQL</h1>
<p><strong>Endpoint:</strong> <code>${base}/graphql</code></p>
<p><a href="${base}/graphql/schema">Schema (SDL)</a></p>
<p>Use <a href="https://studio.apollographql.com/sandbox/explorer?endpoint=${encodeURIComponent(base + '/graphql')}" target="_blank" rel="noopener">Apollo Sandbox</a> or any GraphQL client with the endpoint above.</p>
</body>
</html>
`)
})
// Phoenix API Railing: /api/v1/infra/*, /api/v1/ve/*, /api/v1/health/* proxy + /api/v1/tenants/me/*
await registerPhoenixRailingRoutes(fastify)
// Start Fastify server
const port = parseInt(process.env.PORT || '4000', 10)
const host = process.env.HOST || '0.0.0.0'

View File

@@ -0,0 +1,191 @@
/**
* Phoenix Audit Service
* Immutable audit logs, WORM archive, PII boundaries, compliance
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface AuditLog {
logId: string
userId: string | null
action: string
resourceType: string
resourceId: string
details: Record<string, any>
timestamp: Date
ipAddress?: string
userAgent?: string
}
export interface AuditQuery {
userId?: string
action?: string
resourceType?: string
resourceId?: string
startDate?: Date
endDate?: Date
limit?: number
}
class AuditService {
/**
* Create immutable audit log
*/
async log(
action: string,
resourceType: string,
resourceId: string,
details: Record<string, any>,
userId?: string,
ipAddress?: string,
userAgent?: string
): Promise<AuditLog> {
const db = getDb()
// Scrub PII from details
const scrubbedDetails = this.scrubPII(details)
const result = await db.query(
`INSERT INTO audit_logs (
user_id, action, resource_type, resource_id, details, ip_address, user_agent, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
RETURNING *`,
[
userId || null,
action,
resourceType,
resourceId,
JSON.stringify(scrubbedDetails),
ipAddress || null,
userAgent || null
]
)
logger.info('Audit log created', { logId: result.rows[0].id, action })
// Archive to WORM storage if needed
await this.archiveToWORM(result.rows[0])
return this.mapAuditLog(result.rows[0])
}
/**
* Query audit logs
*/
async query(query: AuditQuery): Promise<AuditLog[]> {
const db = getDb()
const conditions: string[] = []
const params: any[] = []
let paramIndex = 1
if (query.userId) {
conditions.push(`user_id = $${paramIndex++}`)
params.push(query.userId)
}
if (query.action) {
conditions.push(`action = $${paramIndex++}`)
params.push(query.action)
}
if (query.resourceType) {
conditions.push(`resource_type = $${paramIndex++}`)
params.push(query.resourceType)
}
if (query.resourceId) {
conditions.push(`resource_id = $${paramIndex++}`)
params.push(query.resourceId)
}
if (query.startDate) {
conditions.push(`timestamp >= $${paramIndex++}`)
params.push(query.startDate)
}
if (query.endDate) {
conditions.push(`timestamp <= $${paramIndex++}`)
params.push(query.endDate)
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''
const limit = query.limit || 1000
params.push(limit)
const result = await db.query(
`SELECT * FROM audit_logs
${whereClause}
ORDER BY timestamp DESC
LIMIT $${paramIndex}`,
params
)
return result.rows.map(this.mapAuditLog)
}
/**
* Export audit logs for compliance
*/
async exportForCompliance(
startDate: Date,
endDate: Date,
format: 'JSON' | 'CSV' = 'JSON'
): Promise<string> {
const logs = await this.query({ startDate, endDate, limit: 1000000 })
if (format === 'JSON') {
return JSON.stringify(logs, null, 2)
} else {
// CSV format
const headers = ['logId', 'userId', 'action', 'resourceType', 'resourceId', 'timestamp']
const rows = logs.map(log => [
log.logId,
log.userId || '',
log.action,
log.resourceType,
log.resourceId,
log.timestamp.toISOString()
])
return [headers.join(','), ...rows.map(row => row.join(','))].join('\n')
}
}
private scrubPII(data: Record<string, any>): Record<string, any> {
// Placeholder - would implement actual PII scrubbing
// Remove SSNs, credit cards, etc. based on PII boundaries
const scrubbed = { ...data }
// Example: remove credit card numbers
if (scrubbed.cardNumber) {
scrubbed.cardNumber = '***REDACTED***'
}
return scrubbed
}
private async archiveToWORM(log: any): Promise<void> {
// Archive to WORM (Write Once Read Many) storage for compliance
// This would write to immutable storage (S3 with object lock, etc.)
logger.info('Archiving to WORM storage', { logId: log.id })
// Placeholder - would implement actual WORM archiving
}
private mapAuditLog(row: any): AuditLog {
return {
logId: row.id,
userId: row.user_id,
action: row.action,
resourceType: row.resource_type,
resourceId: row.resource_id,
details: typeof row.details === 'string' ? JSON.parse(row.details) : row.details,
timestamp: row.timestamp,
ipAddress: row.ip_address,
userAgent: row.user_agent
}
}
}
export const auditService = new AuditService()

View File

@@ -0,0 +1,188 @@
/**
* Phoenix Event Bus Service
* Durable events, replay, versioning, consumer idempotency
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface Event {
eventId: string
eventType: string
aggregateId: string
version: number
payload: Record<string, any>
metadata: Record<string, any>
timestamp: Date
correlationId: string
}
export interface ConsumerOffset {
consumerId: string
eventId: string
processedAt: Date
}
class EventBusService {
/**
* Publish an event (via outbox pattern)
*/
async publishEvent(
eventType: string,
aggregateId: string,
payload: Record<string, any>,
correlationId: string,
metadata: Record<string, any> = {}
): Promise<Event> {
const db = getDb()
// Get next version for this aggregate
const versionResult = await db.query(
`SELECT COALESCE(MAX(version), 0) + 1 as next_version
FROM events
WHERE aggregate_id = $1 AND event_type = $2`,
[aggregateId, eventType]
)
const version = parseInt(versionResult.rows[0].next_version)
// Insert into outbox (atomic with business logic)
const result = await db.query(
`INSERT INTO event_outbox (
event_type, aggregate_id, version, payload, metadata, correlation_id, status
) VALUES ($1, $2, $3, $4, $5, $6, 'PENDING')
RETURNING *`,
[
eventType,
aggregateId,
version,
JSON.stringify(payload),
JSON.stringify(metadata),
correlationId
]
)
logger.info('Event published to outbox', {
eventId: result.rows[0].id,
eventType,
correlationId
})
// Process outbox (would be done by background worker)
await this.processOutbox()
return this.mapEvent(result.rows[0])
}
/**
* Process outbox (typically run by background worker)
*/
async processOutbox(): Promise<void> {
const db = getDb()
// Get pending events
const pending = await db.query(
`SELECT * FROM event_outbox WHERE status = 'PENDING' ORDER BY created_at LIMIT 100`
)
for (const event of pending.rows) {
try {
// Publish to actual event bus (Kafka/Redpanda/NATS)
await this.publishToBus(event)
// Mark as published
await db.query(
`UPDATE event_outbox SET status = 'PUBLISHED', published_at = NOW() WHERE id = $1`,
[event.id]
)
// Insert into events table
await db.query(
`INSERT INTO events (
event_id, event_type, aggregate_id, version, payload, metadata, correlation_id, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (event_id) DO NOTHING`,
[
event.id,
event.event_type,
event.aggregate_id,
event.version,
event.payload,
event.metadata,
event.correlation_id
]
)
logger.info('Event processed from outbox', { eventId: event.id })
} catch (error) {
logger.error('Failed to process event from outbox', { eventId: event.id, error })
// Would implement retry logic here
}
}
}
/**
* Consume events with idempotency
*/
async consumeEvents(
consumerId: string,
eventType: string,
limit: number = 100
): Promise<Event[]> {
const db = getDb()
// Get last processed event
const lastOffset = await db.query(
`SELECT event_id FROM consumer_offsets
WHERE consumer_id = $1 AND event_type = $2
ORDER BY processed_at DESC LIMIT 1`,
[consumerId, eventType]
)
const lastEventId = lastOffset.rows[0]?.event_id || null
// Get events after last processed
const query = lastEventId
? `SELECT * FROM events
WHERE event_type = $1 AND id > $2
ORDER BY timestamp ASC LIMIT $3`
: `SELECT * FROM events
WHERE event_type = $1
ORDER BY timestamp ASC LIMIT $2`
const params = lastEventId ? [eventType, lastEventId, limit] : [eventType, limit]
const result = await db.query(query, params)
// Record offsets
for (const event of result.rows) {
await db.query(
`INSERT INTO consumer_offsets (consumer_id, event_id, event_type, processed_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (consumer_id, event_id) DO NOTHING`,
[consumerId, event.id, eventType]
)
}
return result.rows.map(this.mapEvent)
}
private async publishToBus(event: any): Promise<void> {
// This would publish to Kafka/Redpanda/NATS
logger.info('Publishing to event bus', { eventId: event.id })
// Placeholder - would implement actual bus publishing
}
private mapEvent(row: any): Event {
return {
eventId: row.id || row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
version: row.version,
payload: typeof row.payload === 'string' ? JSON.parse(row.payload) : row.payload,
metadata: typeof row.metadata === 'string' ? JSON.parse(row.metadata) : row.metadata,
timestamp: row.timestamp || row.created_at,
correlationId: row.correlation_id
}
}
}
export const eventBusService = new EventBusService()

View File

@@ -0,0 +1,182 @@
/**
* Phoenix Identity Service (Sovereign Stack)
* Extends the base identity service with marketplace-specific features
* Users, orgs, roles, permissions, device binding, passkeys, OAuth/OIDC
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
import { identityService } from '../identity.js'
export interface User {
userId: string
email: string
name: string
roles: string[]
permissions: Record<string, any>
orgId: string | null
}
export interface Organization {
orgId: string
name: string
domain: string | null
status: 'ACTIVE' | 'SUSPENDED'
}
export interface DeviceBinding {
deviceId: string
userId: string
deviceType: string
fingerprint: string
lastUsed: Date
}
class SovereignIdentityService {
/**
* Create user
*/
async createUser(
email: string,
name: string,
orgId?: string
): Promise<User> {
const db = getDb()
// Use base identity service for Keycloak integration
const keycloakUser = await identityService.createUser(email, name)
// Store in local DB for marketplace features
const result = await db.query(
`INSERT INTO marketplace_users (user_id, email, name, org_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
org_id = EXCLUDED.org_id
RETURNING *`,
[keycloakUser.id, email, name, orgId || null]
)
return this.mapUser(result.rows[0])
}
/**
* Create organization
*/
async createOrganization(
name: string,
domain?: string
): Promise<Organization> {
const db = getDb()
const result = await db.query(
`INSERT INTO organizations (name, domain, status)
VALUES ($1, $2, 'ACTIVE')
RETURNING *`,
[name, domain || null]
)
logger.info('Organization created', { orgId: result.rows[0].id })
return this.mapOrganization(result.rows[0])
}
/**
* Bind device to user
*/
async bindDevice(
userId: string,
deviceType: string,
fingerprint: string
): Promise<DeviceBinding> {
const db = getDb()
const result = await db.query(
`INSERT INTO device_bindings (user_id, device_type, fingerprint, last_used)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (user_id, fingerprint) DO UPDATE SET
last_used = NOW()
RETURNING *`,
[userId, deviceType, fingerprint]
)
logger.info('Device bound', { deviceId: result.rows[0].id, userId })
return this.mapDeviceBinding(result.rows[0])
}
/**
* Get user with roles and permissions
*/
async getUser(userId: string): Promise<User | null> {
const db = getDb()
const result = await db.query(
`SELECT
u.*,
o.org_id,
ARRAY_AGG(DISTINCT r.role_name) as roles,
jsonb_object_agg(DISTINCT p.permission_key, p.permission_value) as permissions
FROM marketplace_users u
LEFT JOIN organizations o ON u.org_id = o.id
LEFT JOIN user_roles r ON u.user_id = r.user_id
LEFT JOIN user_permissions p ON u.user_id = p.user_id
WHERE u.user_id = $1
GROUP BY u.user_id, o.org_id`,
[userId]
)
if (result.rows.length === 0) {
return null
}
return this.mapUser(result.rows[0])
}
/**
* Assign role to user
*/
async assignRole(userId: string, roleName: string): Promise<void> {
const db = getDb()
await db.query(
`INSERT INTO user_roles (user_id, role_name)
VALUES ($1, $2)
ON CONFLICT (user_id, role_name) DO NOTHING`,
[userId, roleName]
)
logger.info('Role assigned', { userId, roleName })
}
private mapUser(row: any): User {
return {
userId: row.user_id,
email: row.email,
name: row.name,
roles: row.roles || [],
permissions: row.permissions || {},
orgId: row.org_id
}
}
private mapOrganization(row: any): Organization {
return {
orgId: row.id,
name: row.name,
domain: row.domain,
status: row.status
}
}
private mapDeviceBinding(row: any): DeviceBinding {
return {
deviceId: row.id,
userId: row.user_id,
deviceType: row.device_type,
fingerprint: row.fingerprint,
lastUsed: row.last_used
}
}
}
export const sovereignIdentityService = new SovereignIdentityService()

View File

@@ -0,0 +1,175 @@
/**
* Phoenix Ledger Service
* Double-entry ledger with virtual accounts, holds, and multi-asset support
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface JournalEntry {
entryId: string
timestamp: Date
description: string
correlationId: string
lines: JournalLine[]
}
export interface JournalLine {
accountRef: string
debit: number
credit: number
asset: string
}
export interface VirtualAccount {
subaccountId: string
accountId: string
currency: string
asset: string
labels: Record<string, string>
}
export interface Hold {
holdId: string
amount: number
asset: string
expiry: Date | null
status: 'ACTIVE' | 'RELEASED' | 'EXPIRED'
}
export interface Balance {
accountId: string
subaccountId: string | null
asset: string
balance: number
}
class LedgerService {
/**
* Create a journal entry (idempotent via correlation_id)
*/
async createJournalEntry(
correlationId: string,
description: string,
lines: JournalLine[]
): Promise<JournalEntry> {
const db = getDb()
// Check idempotency
const existing = await db.query(
`SELECT * FROM journal_entries WHERE correlation_id = $1`,
[correlationId]
)
if (existing.rows.length > 0) {
logger.info('Journal entry already exists', { correlationId })
return this.mapJournalEntry(existing.rows[0])
}
// Validate double-entry balance
const totalDebits = lines.reduce((sum, line) => sum + line.debit, 0)
const totalCredits = lines.reduce((sum, line) => sum + line.credit, 0)
if (Math.abs(totalDebits - totalCredits) > 0.01) {
throw new Error('Journal entry is not balanced')
}
// Create entry
const result = await db.query(
`INSERT INTO journal_entries (correlation_id, description, timestamp)
VALUES ($1, $2, NOW())
RETURNING *`,
[correlationId, description]
)
const entryId = result.rows[0].id
// Create journal lines
for (const line of lines) {
await db.query(
`INSERT INTO journal_lines (entry_id, account_ref, debit, credit, asset)
VALUES ($1, $2, $3, $4, $5)`,
[entryId, line.accountRef, line.debit, line.credit, line.asset]
)
}
logger.info('Journal entry created', { entryId, correlationId })
return this.mapJournalEntry(result.rows[0])
}
/**
* Create a hold (reserve)
*/
async createHold(
accountId: string,
amount: number,
asset: string,
expiry: Date | null = null
): Promise<Hold> {
const db = getDb()
const result = await db.query(
`INSERT INTO holds (account_id, amount, asset, expiry, status)
VALUES ($1, $2, $3, $4, 'ACTIVE')
RETURNING *`,
[accountId, amount, asset, expiry]
)
logger.info('Hold created', { holdId: result.rows[0].id })
return this.mapHold(result.rows[0])
}
/**
* Get balance for account/subaccount
*/
async getBalance(accountId: string, subaccountId?: string, asset?: string): Promise<Balance[]> {
const db = getDb()
// This would query a materialized view or compute from journal_lines
const query = `
SELECT
account_ref as account_id,
asset,
SUM(debit - credit) as balance
FROM journal_lines
WHERE account_ref = $1
${subaccountId ? 'AND account_ref LIKE $2' : ''}
${asset ? 'AND asset = $3' : ''}
GROUP BY account_ref, asset
`
const params: any[] = [accountId]
if (subaccountId) params.push(`${accountId}:${subaccountId}`)
if (asset) params.push(asset)
const result = await db.query(query, params)
return result.rows.map(row => ({
accountId: row.account_id,
subaccountId: subaccountId || null,
asset: row.asset,
balance: parseFloat(row.balance)
}))
}
private mapJournalEntry(row: any): JournalEntry {
return {
entryId: row.id,
timestamp: row.timestamp,
description: row.description,
correlationId: row.correlation_id,
lines: [] // Would be loaded separately
}
}
private mapHold(row: any): Hold {
return {
holdId: row.id,
amount: parseFloat(row.amount),
asset: row.asset,
expiry: row.expiry,
status: row.status
}
}
}
export const ledgerService = new LedgerService()

View File

@@ -0,0 +1,144 @@
/**
* Phoenix Messaging Orchestrator Service
* Multi-provider messaging (SMS/voice/email/push) with failover
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface MessageRequest {
channel: 'SMS' | 'EMAIL' | 'VOICE' | 'PUSH'
to: string
template: string
params: Record<string, any>
priority: 'LOW' | 'NORMAL' | 'HIGH'
}
export interface MessageStatus {
messageId: string
status: 'PENDING' | 'SENT' | 'DELIVERED' | 'FAILED'
provider: string
deliveryReceipt?: any
retryCount: number
}
class MessagingOrchestratorService {
/**
* Send a message with provider routing and failover
*/
async sendMessage(request: MessageRequest): Promise<MessageStatus> {
const db = getDb()
// Select provider based on rules (cost, deliverability, region, user preference)
const provider = await this.selectProvider(request)
const result = await db.query(
`INSERT INTO messages (channel, recipient, template, params, priority, provider, status)
VALUES ($1, $2, $3, $4, $5, $6, 'PENDING')
RETURNING *`,
[
request.channel,
request.to,
request.template,
JSON.stringify(request.params),
request.priority,
provider
]
)
const messageId = result.rows[0].id
try {
// Send via provider adapter
await this.sendViaProvider(provider, request)
await db.query(
`UPDATE messages SET status = 'SENT' WHERE id = $1`,
[messageId]
)
logger.info('Message sent', { messageId, provider })
return {
messageId,
status: 'SENT',
provider,
retryCount: 0
}
} catch (error) {
// Try failover provider
const failoverProvider = await this.selectFailoverProvider(request, provider)
if (failoverProvider) {
logger.info('Retrying with failover provider', { messageId, failoverProvider })
return this.sendMessage({ ...request, priority: 'HIGH' })
}
await db.query(
`UPDATE messages SET status = 'FAILED' WHERE id = $1`,
[messageId]
)
throw error
}
}
/**
* Get message status
*/
async getMessageStatus(messageId: string): Promise<MessageStatus> {
const db = getDb()
const result = await db.query(
`SELECT * FROM messages WHERE id = $1`,
[messageId]
)
if (result.rows.length === 0) {
throw new Error('Message not found')
}
const row = result.rows[0]
return {
messageId: row.id,
status: row.status,
provider: row.provider,
deliveryReceipt: row.delivery_receipt,
retryCount: row.retry_count || 0
}
}
private async selectProvider(request: MessageRequest): Promise<string> {
// Provider selection logic based on cost, deliverability, region, user preference
// Placeholder - would implement actual routing rules
const providers: Record<string, string[]> = {
SMS: ['twilio', 'aws-sns', 'vonage'],
EMAIL: ['aws-ses', 'sendgrid'],
VOICE: ['twilio', 'vonage'],
PUSH: ['fcm', 'apns']
}
return providers[request.channel]?.[0] || 'twilio'
}
private async selectFailoverProvider(request: MessageRequest, failedProvider: string): Promise<string | null> {
// Select next provider in failover chain
const providers: Record<string, string[]> = {
SMS: ['twilio', 'aws-sns', 'vonage'],
EMAIL: ['aws-ses', 'sendgrid'],
VOICE: ['twilio', 'vonage'],
PUSH: ['fcm', 'apns']
}
const chain = providers[request.channel] || []
const index = chain.indexOf(failedProvider)
return index >= 0 && index < chain.length - 1 ? chain[index + 1] : null
}
private async sendViaProvider(provider: string, request: MessageRequest): Promise<void> {
// This would call the appropriate provider adapter
logger.info('Sending via provider', { provider, request })
// Placeholder - would implement actual provider calls
}
}
export const messagingOrchestratorService = new MessagingOrchestratorService()

View File

@@ -0,0 +1,218 @@
/**
* Phoenix Observability Stack Service
* Distributed tracing, structured logs, SLOs, correlation IDs
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface Trace {
traceId: string
correlationId: string
spans: Span[]
startTime: Date
endTime: Date
duration: number
}
export interface Span {
spanId: string
traceId: string
parentSpanId: string | null
serviceName: string
operationName: string
startTime: Date
endTime: Date
duration: number
tags: Record<string, any>
logs: LogEntry[]
}
export interface LogEntry {
timestamp: Date
level: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'
message: string
correlationId: string
serviceName: string
metadata: Record<string, any>
}
export interface SLO {
sloId: string
serviceName: string
metricName: string
target: number
window: string
currentValue: number
status: 'HEALTHY' | 'WARNING' | 'BREACHED'
}
class ObservabilityService {
/**
* Create a trace
*/
async createTrace(correlationId: string): Promise<Trace> {
const db = getDb()
const traceId = this.generateTraceId()
const result = await db.query(
`INSERT INTO traces (trace_id, correlation_id, start_time)
VALUES ($1, $2, NOW())
RETURNING *`,
[traceId, correlationId]
)
logger.info('Trace created', { traceId, correlationId })
return {
traceId,
correlationId,
spans: [],
startTime: result.rows[0].start_time,
endTime: result.rows[0].start_time,
duration: 0
}
}
/**
* Add span to trace
*/
async addSpan(
traceId: string,
serviceName: string,
operationName: string,
parentSpanId: string | null,
tags: Record<string, any> = {}
): Promise<Span> {
const db = getDb()
const spanId = this.generateSpanId()
const startTime = new Date()
const result = await db.query(
`INSERT INTO spans (
span_id, trace_id, parent_span_id, service_name, operation_name, start_time, tags
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[
spanId,
traceId,
parentSpanId,
serviceName,
operationName,
startTime,
JSON.stringify(tags)
]
)
return {
spanId,
traceId,
parentSpanId,
serviceName,
operationName,
startTime,
endTime: startTime,
duration: 0,
tags,
logs: []
}
}
/**
* Complete a span
*/
async completeSpan(spanId: string, endTime?: Date): Promise<void> {
const db = getDb()
const span = await db.query(
`SELECT * FROM spans WHERE span_id = $1`,
[spanId]
)
if (span.rows.length === 0) {
throw new Error('Span not found')
}
const finishTime = endTime || new Date()
const duration = finishTime.getTime() - span.rows[0].start_time.getTime()
await db.query(
`UPDATE spans SET end_time = $1, duration = $2 WHERE span_id = $3`,
[finishTime, duration, spanId]
)
}
/**
* Log with correlation ID
*/
async log(
level: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR',
message: string,
correlationId: string,
serviceName: string,
metadata: Record<string, any> = {}
): Promise<void> {
const db = getDb()
await db.query(
`INSERT INTO structured_logs (
level, message, correlation_id, service_name, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, NOW())`,
[level, message, correlationId, serviceName, JSON.stringify(metadata)]
)
logger[level.toLowerCase()](message, { correlationId, serviceName, ...metadata })
}
/**
* Get SLO status
*/
async getSLOStatus(serviceName: string, metricName: string): Promise<SLO | null> {
const db = getDb()
const result = await db.query(
`SELECT * FROM slos WHERE service_name = $1 AND metric_name = $2`,
[serviceName, metricName]
)
if (result.rows.length === 0) {
return null
}
const row = result.rows[0]
const currentValue = await this.getCurrentMetricValue(serviceName, metricName)
const status = this.calculateSLOStatus(row.target, currentValue)
return {
sloId: row.id,
serviceName: row.service_name,
metricName: row.metric_name,
target: parseFloat(row.target),
window: row.window,
currentValue,
status
}
}
private async getCurrentMetricValue(serviceName: string, metricName: string): Promise<number> {
// Placeholder - would query actual metrics
return 0.99 // Example: 99% uptime
}
private calculateSLOStatus(target: number, current: number): 'HEALTHY' | 'WARNING' | 'BREACHED' {
if (current >= target) return 'HEALTHY'
if (current >= target * 0.95) return 'WARNING'
return 'BREACHED'
}
private generateTraceId(): string {
return `trace_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
private generateSpanId(): string {
return `span_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
}
export const observabilityService = new ObservabilityService()

View File

@@ -0,0 +1,127 @@
/**
* Phoenix Transaction Orchestrator Service
* On-chain/off-chain workflow orchestration with retries and compensations
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface Workflow {
workflowId: string
correlationId: string
state: 'INITIATED' | 'AUTHORIZED' | 'CAPTURED' | 'SETTLED' | 'REVERSED' | 'FAILED'
steps: WorkflowStep[]
retryCount: number
maxRetries: number
}
export interface WorkflowStep {
stepId: string
type: 'ON_CHAIN' | 'OFF_CHAIN'
action: string
status: 'PENDING' | 'IN_PROGRESS' | 'COMPLETED' | 'FAILED'
retryCount: number
compensation?: string
}
class TransactionOrchestratorService {
/**
* Create a workflow
*/
async createWorkflow(
correlationId: string,
steps: Omit<WorkflowStep, 'stepId' | 'status' | 'retryCount'>[]
): Promise<Workflow> {
const db = getDb()
const result = await db.query(
`INSERT INTO workflows (correlation_id, state, max_retries, metadata)
VALUES ($1, 'INITIATED', 3, $2)
RETURNING *`,
[correlationId, JSON.stringify({ steps })]
)
const workflowId = result.rows[0].id
// Create workflow steps
for (const step of steps) {
await db.query(
`INSERT INTO workflow_steps (workflow_id, type, action, status, compensation)
VALUES ($1, $2, $3, 'PENDING', $4)`,
[workflowId, step.type, step.action, step.compensation || null]
)
}
logger.info('Workflow created', { workflowId, correlationId })
return this.mapWorkflow(result.rows[0])
}
/**
* Execute workflow step
*/
async executeStep(workflowId: string, stepId: string): Promise<void> {
const db = getDb()
// Update step status
await db.query(
`UPDATE workflow_steps SET status = 'IN_PROGRESS' WHERE id = $1`,
[stepId]
)
try {
// Execute step logic here
// This would route to appropriate provider adapter
await db.query(
`UPDATE workflow_steps SET status = 'COMPLETED' WHERE id = $1`,
[stepId]
)
logger.info('Workflow step completed', { workflowId, stepId })
} catch (error) {
await db.query(
`UPDATE workflow_steps SET status = 'FAILED' WHERE id = $1`,
[stepId]
)
throw error
}
}
/**
* Retry failed step
*/
async retryStep(workflowId: string, stepId: string): Promise<void> {
const db = getDb()
const step = await db.query(
`SELECT * FROM workflow_steps WHERE id = $1`,
[stepId]
)
if (step.rows[0].retry_count >= 3) {
throw new Error('Max retries exceeded')
}
await db.query(
`UPDATE workflow_steps
SET status = 'PENDING', retry_count = retry_count + 1
WHERE id = $1`,
[stepId]
)
await this.executeStep(workflowId, stepId)
}
private mapWorkflow(row: any): Workflow {
return {
workflowId: row.id,
correlationId: row.correlation_id,
state: row.state,
steps: [],
retryCount: row.retry_count || 0,
maxRetries: row.max_retries || 3
}
}
}
export const txOrchestratorService = new TransactionOrchestratorService()

View File

@@ -0,0 +1,141 @@
/**
* Phoenix Voice Orchestrator Service
* TTS/STT with caching, multi-provider routing, moderation
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
import crypto from 'crypto'
export interface VoiceSynthesisRequest {
text: string
voiceProfile: string
format: 'mp3' | 'wav' | 'ogg'
latencyClass: 'LOW' | 'STANDARD' | 'HIGH_QUALITY'
}
export interface VoiceSynthesisResult {
audioHash: string
audioUrl: string
duration: number
provider: string
cached: boolean
}
class VoiceOrchestratorService {
/**
* Synthesize voice with caching
*/
async synthesizeVoice(request: VoiceSynthesisRequest): Promise<VoiceSynthesisResult> {
const db = getDb()
// Generate deterministic cache key
const cacheKey = this.generateCacheKey(request.text, request.voiceProfile, request.format)
// Check cache
const cached = await db.query(
`SELECT * FROM voice_cache WHERE cache_key = $1`,
[cacheKey]
)
if (cached.rows.length > 0) {
logger.info('Voice synthesis cache hit', { cacheKey })
return {
audioHash: cached.rows[0].audio_hash,
audioUrl: cached.rows[0].audio_url,
duration: cached.rows[0].duration,
provider: cached.rows[0].provider,
cached: true
}
}
// Select provider based on latency class
const provider = this.selectProvider(request.latencyClass)
// Scrub PII from text
const scrubbedText = this.scrubPII(request.text)
// Synthesize via provider
const synthesis = await this.synthesizeViaProvider(provider, {
...request,
text: scrubbedText
})
// Store in cache
await db.query(
`INSERT INTO voice_cache (cache_key, audio_hash, audio_url, duration, provider, text_hash)
VALUES ($1, $2, $3, $4, $5, $6)`,
[cacheKey, synthesis.audioHash, synthesis.audioUrl, synthesis.duration, provider, cacheKey]
)
logger.info('Voice synthesized', { cacheKey, provider })
return {
...synthesis,
provider,
cached: false
}
}
/**
* Get cached audio by hash
*/
async getAudioByHash(hash: string): Promise<VoiceSynthesisResult | null> {
const db = getDb()
const result = await db.query(
`SELECT * FROM voice_cache WHERE audio_hash = $1`,
[hash]
)
if (result.rows.length === 0) {
return null
}
const row = result.rows[0]
return {
audioHash: row.audio_hash,
audioUrl: row.audio_url,
duration: row.duration,
provider: row.provider,
cached: true
}
}
private generateCacheKey(text: string, voiceProfile: string, format: string): string {
const hash = crypto.createHash('sha256')
hash.update(`${text}:${voiceProfile}:${format}`)
return hash.digest('hex')
}
private scrubPII(text: string): string {
// Placeholder - would implement actual PII scrubbing
// Remove emails, phone numbers, SSNs, etc.
return text
}
private selectProvider(latencyClass: string): string {
const providers: Record<string, string> = {
LOW: 'elevenlabs',
STANDARD: 'openai',
HIGH_QUALITY: 'elevenlabs'
}
return providers[latencyClass] || 'elevenlabs'
}
private async synthesizeViaProvider(
provider: string,
request: VoiceSynthesisRequest
): Promise<Omit<VoiceSynthesisResult, 'provider' | 'cached'>> {
// This would call the appropriate provider adapter
logger.info('Synthesizing via provider', { provider, request })
// Placeholder - would implement actual provider calls
return {
audioHash: crypto.randomBytes(32).toString('hex'),
audioUrl: `https://cdn.sankofa.nexus/voice/${crypto.randomBytes(16).toString('hex')}.${request.format}`,
duration: 0
}
}
}
export const voiceOrchestratorService = new VoiceOrchestratorService()

View File

@@ -0,0 +1,112 @@
/**
* Phoenix Wallet Registry Service
* Wallet mapping, chain support, policy engine, and recovery
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface Wallet {
walletId: string
userId: string
orgId: string | null
address: string
chainId: number
custodyType: 'USER' | 'SHARED' | 'PLATFORM'
status: 'ACTIVE' | 'SUSPENDED' | 'RECOVERED'
}
export interface TransactionRequest {
from: string
to: string
value: string
data?: string
chainId: number
}
export interface TransactionSimulation {
success: boolean
gasEstimate: string
error?: string
warnings?: string[]
}
class WalletRegistryService {
/**
* Register a wallet
*/
async registerWallet(
userId: string,
address: string,
chainId: number,
custodyType: 'USER' | 'SHARED' | 'PLATFORM',
orgId?: string
): Promise<Wallet> {
const db = getDb()
const result = await db.query(
`INSERT INTO wallets (user_id, org_id, address, chain_id, custody_type, status)
VALUES ($1, $2, $3, $4, $5, 'ACTIVE')
RETURNING *`,
[userId, orgId || null, address, chainId, custodyType]
)
logger.info('Wallet registered', { walletId: result.rows[0].id, address })
return this.mapWallet(result.rows[0])
}
/**
* Build a transaction
*/
async buildTransaction(request: TransactionRequest): Promise<string> {
// This would use a transaction builder service with deterministic encoding
logger.info('Building transaction', { request })
// Placeholder - would integrate with actual transaction builder
return '0x' // Serialized transaction
}
/**
* Simulate a transaction
*/
async simulateTransaction(request: TransactionRequest): Promise<TransactionSimulation> {
logger.info('Simulating transaction', { request })
// Placeholder - would call chain RPC for simulation
return {
success: true,
gasEstimate: '21000',
warnings: []
}
}
/**
* Get wallets for user
*/
async getWalletsForUser(userId: string, chainId?: number): Promise<Wallet[]> {
const db = getDb()
const query = chainId
? `SELECT * FROM wallets WHERE user_id = $1 AND chain_id = $2`
: `SELECT * FROM wallets WHERE user_id = $1`
const params = chainId ? [userId, chainId] : [userId]
const result = await db.query(query, params)
return result.rows.map(this.mapWallet)
}
private mapWallet(row: any): Wallet {
return {
walletId: row.id,
userId: row.user_id,
orgId: row.org_id,
address: row.address,
chainId: row.chain_id,
custodyType: row.custody_type,
status: row.status
}
}
}
export const walletRegistryService = new WalletRegistryService()