Veritas Stream
Veritas-stream is an immutable event store and governance backbone for multi-agent systems . Built on Event Sourcing and CQRS, it provides persistent memory to make AI decisions auditable and reproducible . It prevents agent context loss and provides the regulatory audit trail for production .
Ask AI about Veritas Stream
Powered by Claude Β· Grounded in docs
I know everything about Veritas Stream. Ask me about installation, configuration, usage, or troubleshooting.
0/500
Reviews
Documentation
VeritasStream: The Ledger
Event-sourcing infrastructure for Apex Financial Services' AI-driven loan processing platform.
The Ledger provides append-only event streams, aggregate replay, cryptographic audit chains, and an MCP server interface so AI agents can write commands and read projections through a single, LLM-consumable API.
System Architecture
flowchart TD
Agent([AI Agent / LLM]) -->|MCP Tool call| MCP[MCP Server<br/>ledger/mcp/]
Agent -->|MCP Resource read| MCP
Web[Next.js Dashboard<br/>web/] -->|HTTP REST| API[FastAPI REST API<br/>api/]
MCP -->|Commands| Handlers[Command Handlers<br/>ledger/commands/handlers.py]
API -->|Commands| Handlers
MCP -->|Query| Projections[Projections<br/>ledger/projections/]
API -->|Query| Projections
Handlers -->|append| ES[(EventStore<br/>PostgreSQL / InMemory)]
ES -->|load_all| Daemon[ProjectionDaemon]
Daemon --> AppSummary[ApplicationSummary]
Daemon --> ComplianceAudit[ComplianceAuditView]
Daemon --> AgentPerf[AgentPerformanceLedger]
Projections --> AppSummary
Projections --> ComplianceAudit
Projections --> AgentPerf
Handlers -->|load_stream| Aggregates[Domain Aggregates<br/>ledger/domain/aggregates/]
AuditChain[Audit Hash Chain<br/>ledger/integrity/] -->|SHA-256 chain| ES
GasTown[Gas Town<br/>ledger/integrity/] -->|reconstruct context| ES
Upcasting[Upcasters<br/>ledger/upcasting/] -->|v1βv2 on read| ES
Quick Start
1. Install dependencies (requires Python 3.12, Node.js 20+)
uv sync
cd web && bun install && cd ..
2. Configure environment
cp .env.example .env
# Edit .env and set:
# DATABASE_URL PostgreSQL connection string
# ANTHROPIC_API_KEY Anthropic API key (primary LLM)
# GEMINI_API_KEY Optional: Google Gemini via OpenRouter (fallback)
# APPLICANT_REGISTRY_URL Same DB as DATABASE_URL in the default setup
# DOCUMENTS_DIR Path to uploaded documents (default: ./documents)
3. Start the database
docker compose up -d
This starts PostgreSQL on port 5433 (host) mapped to 5432 (container).
4. Run database migrations
DATABASE_URL=postgresql://postgres:apex@localhost:5433/apex_ledger \
uv run psql -U postgres -h localhost -p 5433 -d apex_ledger -f schema.sql
5. Run all tests
# In-memory tests (no database required)
uv run pytest tests/ --ignore=tests/test_schema.py --ignore=tests/test_event_store.py -v
# Schema + EventStore tests (requires PostgreSQL on port 5433)
DATABASE_URL=postgresql://postgres:apex@localhost:5433/apex_ledger \
uv run pytest tests/test_schema.py tests/test_event_store.py -v
6. Start the full stack (API + web dashboard)
make dev
This builds the Next.js app if needed, then starts both services concurrently:
- FastAPI REST API on http://localhost:8000 (with live projection daemon)
- Next.js dashboard on http://localhost:3000
To start services individually:
# API only
uv run uvicorn api.main:app --reload --port 8000
# Web only (requires the API to be running)
cd web && bun start
8. Run the MCP server
The server is started by running scripts/run_pipeline.py or by embedding the
server factory in your own script. It binds to 0.0.0.0:8000 (stdio transport
by default when invoked via FastMCP mcp.run(); HTTP transport available via
mcp.run(transport="http", port=8000)).
from ledger.event_store import EventStore
from ledger.projections.application_summary import ApplicationSummaryProjection
from ledger.projections.compliance_audit import ComplianceAuditViewProjection
from ledger.projections.agent_performance import AgentPerformanceLedgerProjection
from ledger.projections.daemon import ProjectionDaemon
from ledger.mcp.server import create_mcp_server
store = EventStore("postgresql://postgres:apex@localhost:5433/apex_ledger")
summary = ApplicationSummaryProjection()
compliance = ComplianceAuditViewProjection()
perf = AgentPerformanceLedgerProjection()
daemon = ProjectionDaemon(store, [summary, compliance, perf])
mcp = create_mcp_server(store, daemon, {
"summary": summary,
"compliance": compliance,
"agent_performance": perf,
})
mcp.run()
To verify the server is accepting connections (HTTP transport):
curl http://localhost:8000/health
# Expected: {"status":"ok"}
9. Query MCP resources
Resources are read-side projections. Query them by URI once the server is running:
from fastmcp import Client
async with Client("http://localhost:8000") as client:
# Application summary (current state)
summary = await client.read_resource("ledger://applications/APEX-0001")
# Compliance state at a specific point in time
compliance = await client.read_resource(
"ledger://applications/APEX-0001/compliance/2026-03-01T10:00:00"
)
# Full audit trail (stream replay)
trail = await client.read_resource("ledger://applications/APEX-0001/audit-trail")
# Projection daemon health and lag per projection (milliseconds)
health = await client.read_resource("ledger://ledger/health")
Event Flow: Full Loan Lifecycle
sequenceDiagram
participant Client
participant MCP as MCP Server
participant Handlers as Command Handlers
participant ES as EventStore
participant Daemon as ProjectionDaemon
Client->>MCP: submit_application
MCP->>Handlers: handle_submit_application
Handlers->>ES: append("loan-{id}", ApplicationSubmitted)
Client->>MCP: start_agent_session (credit)
MCP->>Handlers: handle_start_agent_session
Handlers->>ES: append("agent-credit-{sess}", AgentSessionStarted + AgentInputValidated)
Client->>MCP: record_credit_analysis
MCP->>Handlers: handle_credit_analysis_completed
Handlers->>ES: append("loan-{id}", CreditAnalysisCompleted)
Client->>MCP: record_fraud_screening
Client->>MCP: record_compliance_check (ΓN rules)
Client->>MCP: generate_decision
MCP->>Handlers: handle_generate_decision
Handlers->>ES: append("loan-{id}", DecisionGenerated)
Client->>MCP: record_human_review
Handlers->>ES: append("loan-{id}", HumanReviewCompleted)
ES-->>Daemon: load_all (poll)
Daemon-->>Client: ledger://applications/{id} β FinalApproved
Aggregate State Machine
stateDiagram-v2
[*] --> SUBMITTED: ApplicationSubmitted
SUBMITTED --> DOCUMENTS_PENDING: DocumentsRequested
DOCUMENTS_PENDING --> DOCUMENTS_UPLOADED: DocumentsUploaded
DOCUMENTS_UPLOADED --> DOCUMENTS_PROCESSED: DocumentsProcessed
DOCUMENTS_PROCESSED --> UNDER_REVIEW: UnderReview
UNDER_REVIEW --> PENDING_DECISION: DecisionGenerated (APPROVE/DECLINE)
UNDER_REVIEW --> PENDING_HUMAN_REVIEW: DecisionGenerated (REFER)
PENDING_HUMAN_REVIEW --> APPROVED: HumanReviewCompleted (APPROVED)
PENDING_HUMAN_REVIEW --> DECLINED: HumanReviewCompleted (DECLINED)
PENDING_DECISION --> APPROVED: ApplicationApproved
PENDING_DECISION --> DECLINED: ApplicationDeclined
APPROVED --> [*]
DECLINED --> [*]
Test Suite
# Branch 1: Schema (requires PostgreSQL)
pytest tests/test_schema.py -v
# Branch 2: Domain aggregates + business rules
pytest tests/test_aggregates.py -v
# Branch 3: Command handlers + lifecycle
pytest tests/test_command_handlers.py -v
# Branch 4: Projections + daemon
pytest tests/test_projections.py -v
# Branch 5: Upcasting + integrity
pytest tests/test_upcasting.py tests/test_integrity.py -v
# Branch 6: MCP integration
pytest tests/test_mcp_integration.py -v
# Required submission test files
pytest tests/test_concurrency.py tests/test_gas_town.py tests/test_mcp_lifecycle.py -v
# All in-memory tests at once
pytest tests/ --ignore=tests/test_schema.py --ignore=tests/test_event_store.py -v
Coverage by branch:
| Branch | Test file | Requirements proven |
|---|---|---|
| 1 | test_schema.py | Append-only constraint, outbox FK, identity-based global ordering |
| 2 | test_aggregates.py | All 6 named business rules (state machine, Gas Town, model lock, confidence floor, compliance dependency, causal chain) |
| 3 | test_command_handlers.py | Double-Decision Test, Gas Town enforcement, full lifecycle, OCC losers get typed error |
| 4 | test_projections.py | ApplicationSummary SLO < 500 ms, ComplianceAuditView SLO < 2 s, temporal query, rebuild determinism, daemon fault tolerance |
| 5 | test_upcasting.py | Mandatory TRP immutability, null-over-fabrication for unknown fields |
| 5 | test_integrity.py | Tamper detection, Gas Town crash recovery, NEEDS_RECONCILIATION on partial state |
| 6 | test_mcp_integration.py | Full lifecycle via MCP only, structured error types, projection-backed resources, health watchdog |
| Submission | test_concurrency.py | OCC double-decision: exactly one winner, one OptimisticConcurrencyError |
| Submission | test_gas_town.py | Agent crash recovery, partial decision flagged as NEEDS_RECONCILIATION |
| Submission | test_mcp_lifecycle.py | Full loan lifecycle driven exclusively via MCP tool calls |
MCP Tools and Resources
Tools (commands, write side)
| Tool | Description |
|---|---|
submit_application | Submit a new loan application |
start_agent_session | Open an agent session (Gas Town contract) |
record_credit_analysis | Record credit analysis result |
record_fraud_screening | Record fraud screening result |
record_compliance_check | Record a compliance rule evaluation |
generate_decision | Generate final lending decision |
record_human_review | Record human reviewer outcome |
run_integrity_check | Run cryptographic audit hash-chain check |
Resources (projections, read side)
| Resource URI | Description |
|---|---|
ledger://applications/{id} | ApplicationSummary read model |
ledger://applications/{id}/compliance | Current compliance state |
ledger://applications/{id}/compliance/{as_of} | Temporal compliance query |
ledger://applications/{id}/audit-trail | Full event history (stream replay) |
ledger://agents/{agent_id}/performance | Agent performance statistics |
ledger://agents/{agent_id}/sessions/{session_id} | Gas Town session reconstruction |
ledger://ledger/health | Projection daemon lag report |
Project Structure
veritas-stream/
βββ ledger/
β βββ event_store.py # EventStore (PostgreSQL) + InMemoryEventStore
β βββ commands/
β β βββ handlers.py # 7 command handlers (load β validate β append)
β βββ domain/
β β βββ aggregates/
β β βββ loan_application.py # LoanApplicationAggregate (13 event types, 6 rules)
β β βββ agent_session.py # AgentSessionAggregate (Gas Town)
β β βββ compliance_record.py
β β βββ audit_ledger.py
β βββ projections/
β β βββ daemon.py # ProjectionDaemon (checkpoint-aware fan-out)
β β βββ application_summary.py # ApplicationSummaryProjection
β β βββ compliance_audit.py # ComplianceAuditViewProjection (temporal queries)
β β βββ agent_performance.py # AgentPerformanceLedgerProjection
β βββ upcasting/
β β βββ registry.py # UpcasterRegistry (immutability contract)
β β βββ upcasters.py # CreditAnalysisCompleted v1βv2, DecisionGenerated v1βv2
β βββ integrity/
β β βββ audit_chain.py # SHA-256 hash chain tamper detection
β β βββ gas_town.py # reconstruct_agent_context (crash recovery)
β βββ mcp/
β βββ server.py # create_mcp_server() factory
β βββ tools.py # 8 MCP tools
β βββ resources.py # 7 MCP resources
βββ api/
β βββ main.py # FastAPI app factory with lifespan (daemon startup)
β βββ dependencies.py # Shared FastAPI dependencies (store, daemon, projections)
β βββ routers/ # REST endpoints: applications, agents, compliance, pipeline
β βββ models/ # Pydantic request/response models
β βββ services/ # Business logic called by routers
βββ web/
β βββ app/ # Next.js App Router pages
β β βββ applications/ # Application list and detail views
β β βββ compliance/ # Compliance audit dashboard
β β βββ documents/ # Document upload and review
β β βββ analytics/ # Agent performance analytics
β β βββ whatif/ # Counterfactual what-if projector
β βββ components/ # Shared React components
βββ scripts/
β βββ run_pipeline.py # End-to-end pipeline runner
β βββ rebuild_projections.py # Rebuild projections from scratch
β βββ demo_narr05.py # NARR-05 crash recovery demo
βββ tests/
β βββ test_schema.py
β βββ test_aggregates.py
β βββ test_command_handlers.py
β βββ test_projections.py
β βββ test_upcasting.py
β βββ test_integrity.py
β βββ test_mcp_integration.py
β βββ test_concurrency.py # OCC double-decision (submission requirement)
β βββ test_gas_town.py # crash recovery (submission requirement)
β βββ test_mcp_lifecycle.py # MCP-only lifecycle (submission requirement)
βββ what_if/
β βββ projector.py # run_what_if(): counterfactual projection
βββ regulatory/
β βββ package.py # generate_regulatory_package()
βββ schema.sql # PostgreSQL DDL (events, event_streams, projections, outbox)
βββ docker-compose.yml # PostgreSQL 16 on port 5433
βββ Makefile # dev (api+web), pdf (report rendering), clean-pdf
βββ pyproject.toml
