Vitruvyan
Docs
Knowledge_baseOculus_prime

Vitruvyan Docs

🌐 Oculus Prime Flow Graph (Streams-Native)

Updated: February 20, 2026
Context: Complete end-to-end pipeline with all Sacred Orders
Scope: Full epistemic flow from acquisition to archival

🎯 Purpose

This document describes the canonical Oculus Prime flow after migration to Redis Streams.

Key constraints:

  1. Oculus Prime is the single edge gateway for H2M/M2M ingestion.
  2. Oculus Prime performs acquisition, normalization, and immutable persistence.
  3. Oculus Prime emits oculus_prime.evidence.created via StreamBus (legacy alias supported).
  4. Semantic enrichment starts downstream (Codex Hunters and beyond).

🔄 End-to-End Flow

flowchart TD
    %% External Input
    A[External Source<br/>H2M / M2M] --> B[Oculus Prime API<br/>POST /api/intake/:type]
    
    %% Oculus Prime Layer (PERCEPTION)
    B --> C[Media Intake Agent<br/>document/image/audio/video<br/>landscape/geo/cad]
    C --> D[Evidence Pack Builder<br/>literal normalized_text]
    D --> E1[(PostgreSQL<br/>evidence_packs<br/>append-only)]
    D --> F[IntakeEventEmitter]
    F --> E2[(PostgreSQL<br/>intake_event_log<br/>intake_event_failures)]
    F --> G1[[StreamBus<br/>oculus_prime.evidence.created]]
    F -.legacy.-> G2[[StreamBus<br/>intake.evidence.created]]
    
    %% Codex Hunters Layer (PERCEPTION → MEMORY)
    G1 --> H1[Codex Hunters Listener<br/>Consumer Group: codex_hunters]
    H1 --> H2[TrackerConsumer<br/>Discovery Pipeline]
    H2 --> H3[RestorerConsumer<br/>Data Normalization]
    H3 --> H4[BinderConsumer<br/>Entity Binding]
    H4 --> H5[(PostgreSQL<br/>entities<br/>entity_metadata)]
    H4 --> H6[Embedding API<br/>GET /embed]
    H6 --> H7[(Qdrant<br/>codex_entities<br/>vector storage)]
    H4 --> H8[[StreamBus<br/>codex.entity.discovered]]
    H4 --> H9[[StreamBus<br/>codex.entity.restored]]
    H4 --> H10[[StreamBus<br/>codex.entity.bound]]
    
    %% Pattern Weavers Layer (REASON)
    H8 --> I1[Pattern Weavers Listener<br/>Consumer Group: pattern_weavers]
    H10 --> I1
    I1 --> I2[WeaverConsumer<br/>Ontology Mapping]
    I2 --> I3[KeywordMatcherConsumer<br/>Pattern Classification]
    I3 --> I4[(PostgreSQL<br/>patterns<br/>ontology_mappings)]
    I3 --> I5[Embedding API<br/>GET /embed]
    I5 --> I6[(Qdrant<br/>pattern_taxonomy<br/>vector storage)]
    I3 --> I7[[StreamBus<br/>pattern.weave.response]]
    I3 --> I8[[StreamBus<br/>pattern.taxonomy.updated]]
    
    %% Babel Gardens Layer (DISCOURSE - Parallel Processing)
    H8 -.optional.-> J1[Babel Gardens Listener<br/>Consumer Group: babel_gardens]
    J1 --> J2[SynthesisConsumer<br/>Language Processing]
    J2 --> J3[TopicClassifierConsumer<br/>Topic Extraction]
    J3 --> J4[LanguageDetectorConsumer<br/>Language Detection]
    J4 --> J5[(Qdrant<br/>babel_embeddings<br/>vector storage)]
    J4 --> J6[[StreamBus<br/>babel.embedding.response]]
    J4 --> J7[[StreamBus<br/>babel.emotion.response]]
    J4 --> J8[[StreamBus<br/>babel.topic.response]]
    
    %% Memory Orders Layer (MEMORY - Coherence Monitor)
    I7 --> K1[Memory Orders<br/>Coherence Monitor]
    I8 --> K1
    K1 --> K2[CoherenceAnalyzer<br/>PostgreSQL ↔ Qdrant Sync]
    K2 --> K3[HealthAggregator<br/>System Health Check]
    K3 --> K4[SyncPlanner<br/>Drift Resolution]
    K4 --> K5[(PostgreSQL<br/>coherence_reports<br/>health_metrics)]
    K4 --> K6[[StreamBus<br/>memory.coherence.checked]]
    K4 --> K7[[StreamBus<br/>memory.health.checked]]
    K4 --> K8[[StreamBus<br/>memory.sync.completed]]
    
    %% Orthodoxy Wardens Layer (TRUTH - Governance)
    I7 -.audit.-> L1[Orthodoxy Wardens<br/>Governance Engine]
    K6 -.audit.-> L1
    L1 --> L2[QualityAuditor<br/>Data Quality Check]
    L2 --> L3[ComplianceValidator<br/>Policy Enforcement]
    L3 --> L4[(PostgreSQL<br/>audit_logs<br/>compliance_reports)]
    L3 --> L5[[StreamBus<br/>orthodoxy.audit.completed]]
    
    %% Vault Keepers Layer (TRUTH - Archival)
    L5 --> M1[Vault Keepers Listener<br/>Consumer Group: vault_keepers]
    K8 --> M1
    M1 --> M2[ArchiveConsumer<br/>Evidence Archival]
    M2 --> M3[SnapshotConsumer<br/>State Snapshots]
    M3 --> M4[IntegrityConsumer<br/>Hash Verification]
    M4 --> M5[(PostgreSQL<br/>vault_archives<br/>snapshots<br/>integrity_checks)]
    M4 --> M6[[StreamBus<br/>vault.archive.completed]]
    M4 --> M7[[StreamBus<br/>vault.snapshot.created]]
    M4 --> M8[[StreamBus<br/>vault.integrity.validated]]
    
    %% Styling
    classDef pgDB fill:#336791,stroke:#23527c,stroke-width:2px,color:#fff
    classDef qdrantDB fill:#DC244C,stroke:#991732,stroke-width:2px,color:#fff
    classDef streamBus fill:#D82C20,stroke:#a61c13,stroke-width:2px,color:#fff
    classDef perception fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    classDef reason fill:#2196F3,stroke:#0D47A1,stroke-width:2px,color:#fff
    classDef memory fill:#9C27B0,stroke:#6A1B9A,stroke-width:2px,color:#fff
    classDef truth fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
    classDef discourse fill:#00BCD4,stroke:#006064,stroke-width:2px,color:#fff
    
    class E1,E2,H5,I4,K5,L4,M5 pgDB
    class H7,I6,J5 qdrantDB
    class G1,G2,H8,H9,H10,I7,I8,J6,J7,J8,K6,K7,K8,L5,M6,M7,M8 streamBus
    class C,D,F,H1,H2,H3,H4 perception
    class I1,I2,I3 reason
    class K1,K2,K3,K4 memory
    class L1,L2,L3,M1,M2,M3,M4 truth
    class J1,J2,J3,J4 discourse

Responsibility Boundary

Oculus Prime MUST:

  1. Create immutable Evidence Packs.
  2. Emit stream events and audit logs.
  3. Stay media-agnostic and domain-agnostic in acquisition behavior.

Oculus Prime MUST NOT:

  1. Run NER, embeddings, ontology mapping.
  2. Decide semantic relevance.
  3. Call downstream cognitive services directly.
  4. Read downstream cognitive tables.

📊 Pipeline Legend

Color Coding by Sacred Order

  • 🟢 Green (Perception): Oculus Prime + Codex Hunters — Acquisition & Discovery
  • 🔵 Blue (Reason): Pattern Weavers — Ontology & Classification
  • 🟣 Purple (Memory): Memory Orders — Coherence & Sync
  • 🟠 Orange (Truth): Orthodoxy Wardens + Vault Keepers — Governance & Archival
  • 🔵 Cyan (Discourse): Babel Gardens — Language Processing (parallel)

Component Types

  • 🗄️ Dark Blue Cylinders: PostgreSQL (structured storage)
  • 🔴 Red Cylinders: Qdrant (vector embeddings)
  • 🔴 Red Boxes: Redis Streams (StreamBus event channels)

🎯 Detailed Pipeline Flow

1️⃣ PERCEPTION Layer: Oculus Prime (Intake)

Responsibilities:

  • Accept file uploads from external sources (documents, images, audio, video, geo, CAD)
  • Extract literal text (NO semantic interpretation)
  • Create immutable Evidence Packs
  • Emit standardized events

PostgreSQL Tables:

  • evidence_packs — Append-only evidence storage
  • intake_event_log — Audit trail of all emitted events
  • intake_event_failures — Failed emission diagnostics

StreamBus Events Emitted:

  • oculus_prime.evidence.created (v2 canonical)
  • intake.evidence.created (v1 legacy alias, dual-write mode)

Event Payload:

{
  "event_id": "EVT-{UUID}",
  "evidence_id": "EVD-{UUID}",
  "chunk_id": "CHK-{N}",
  "source_type": "document|image|audio|video|stream|sensor",
  "source_uri": "/uploads/file.ext",
  "source_hash": "sha256:...",
  "byte_size": 1024000,
  "language_detected": "en",
  "sampling_policy_ref": "SAMPPOL-DOC-DEFAULT-V1"
}

2️⃣ PERCEPTION → MEMORY: Codex Hunters (Discovery & Enrichment)

Responsibilities:

  • Consume evidence created events
  • Perform discovery (entity recognition)
  • Restore data (normalization)
  • Bind entities (relationship linking)
  • Generate embeddings for semantic search

Consumer Group: codex_hunters

Consumed Events:

  • oculus_prime.evidence.created (primary)
  • intake.evidence.created (legacy fallback)

Processing Pipeline:

  1. TrackerConsumer — Discovery pipeline (entity detection)
  2. RestorerConsumer — Data normalization & quality assessment
  3. BinderConsumer — Entity relationship binding

PostgreSQL Tables:

  • entities — Discovered entities with metadata
  • entity_metadata — Additional entity attributes

Qdrant Collections:

  • codex_entities — Vector embeddings for entities (via Embedding API)

StreamBus Events Emitted:

  • codex.entity.discovered — New entity found
  • codex.entity.restored — Data normalized
  • codex.entity.bound — Relationships established

3️⃣ REASON Layer: Pattern Weavers (Ontology Mapping)

Responsibilities:

  • Map entities to domain ontologies
  • Extract concepts and relationships
  • Classify entities into taxonomies
  • Enable semantic pattern matching

Consumer Group: pattern_weavers

Consumed Events:

  • codex.entity.discovered
  • codex.entity.bound

Processing Pipeline:

  1. WeaverConsumer — Ontology mapping & concept extraction
  2. KeywordMatcherConsumer — Pattern classification & keyword matching

PostgreSQL Tables:

  • patterns — Detected patterns and relationships
  • ontology_mappings — Entity-to-ontology bindings

Qdrant Collections:

  • pattern_taxonomy — Vector embeddings for patterns (via Embedding API)

StreamBus Events Emitted:

  • pattern.weave.response — Pattern weaving completed
  • pattern.taxonomy.updated — Taxonomy changes

4️⃣ DISCOURSE Layer: Babel Gardens (Language Processing — Parallel)

Responsibilities:

  • Language detection and translation
  • Emotion and sentiment analysis
  • Topic classification
  • Linguistic synthesis

Consumer Group: babel_gardens

Consumed Events (optional, parallel to Pattern Weavers):

  • codex.entity.discovered

Processing Pipeline:

  1. SynthesisConsumer — Language processing & synthesis
  2. TopicClassifierConsumer — Topic extraction
  3. LanguageDetectorConsumer — Language identification

Qdrant Collections:

  • babel_embeddings — Linguistic embeddings for multilingual semantic search

StreamBus Events Emitted:

  • babel.embedding.response — Embedding generation completed
  • babel.emotion.response — Emotion analysis result
  • babel.topic.response — Topic classification result

5️⃣ MEMORY Layer: Memory Orders (Coherence Monitor)

Responsibilities:

  • Monitor dual-memory coherence (PostgreSQL ↔ Qdrant)
  • Detect drift and synchronization issues
  • Aggregate system health metrics
  • Plan and execute sync operations

Processing Pipeline:

  1. CoherenceAnalyzer — Compare PostgreSQL vs Qdrant counts
  2. HealthAggregator — Collect health metrics from all components
  3. SyncPlanner — Generate drift resolution plans

PostgreSQL Tables:

  • coherence_reports — Coherence check audit trail
  • health_metrics — System health snapshots

StreamBus Events Emitted:

  • memory.coherence.checked — Coherence check completed
  • memory.health.checked — System health report
  • memory.sync.completed — Sync operation finished

Event Payload Example:

{
  "status": "healthy|warning|critical",
  "pg_count": 10000,
  "qdrant_count": 9950,
  "drift_percentage": 0.5,
  "drift_absolute": 50,
  "table": "entities",
  "collection": "codex_entities"
}

6️⃣ TRUTH Layer: Orthodoxy Wardens (Governance)

Responsibilities:

  • Data quality auditing
  • Policy compliance validation
  • Schema validation
  • Integrity enforcement

Processing Pipeline:

  1. QualityAuditor — Assess data quality metrics
  2. ComplianceValidator — Enforce governance policies

PostgreSQL Tables:

  • audit_logs — Comprehensive audit trail
  • compliance_reports — Policy violation tracking

StreamBus Events Emitted:

  • orthodoxy.audit.completed — Audit finished (consumed by Vault Keepers)

7️⃣ TRUTH Layer: Vault Keepers (Archival & Persistence)

Responsibilities:

  • Archive evidence and enriched metadata
  • Create state snapshots
  • Verify data integrity (hash validation)
  • Manage backup and recovery

Consumer Group: vault_keepers

Consumed Events:

  • orthodoxy.audit.completed — Audit events for archival
  • memory.sync.completed — Sync events for snapshot triggers

Processing Pipeline:

  1. ArchiveConsumer — Evidence archival
  2. SnapshotConsumer — State snapshot creation
  3. IntegrityConsumer — Hash verification & integrity checks

PostgreSQL Tables:

  • vault_archives — Archived evidence and metadata
  • snapshots — Point-in-time state snapshots
  • integrity_checks — Hash validation records

StreamBus Events Emitted:

  • vault.archive.completed — Archival finished
  • vault.snapshot.created — Snapshot saved
  • vault.integrity.validated — Integrity check passed

🔑 Key Integration Points

Who Embeds to Qdrant?

  1. Codex Hunterscodex_entities collection (via Embedding API)
  2. Pattern Weaverspattern_taxonomy collection (via Embedding API)
  3. Babel Gardensbabel_embeddings collection (direct QdrantAgent)

Who Logs to PostgreSQL?

  1. Oculus Primeevidence_packs, intake_event_log, intake_event_failures
  2. Codex Huntersentities, entity_metadata, entity_relations (ontological edges from FK data)
  3. Pattern Weaverspatterns, ontology_mappings, entity_relations (ontological edges from LLM)
  4. Memory Orderscoherence_reports, health_metrics
  5. Orthodoxy Wardensaudit_logs, compliance_reports
  6. Vault Keepersvault_archives, snapshots, integrity_checks

Event Flow Summary

oculus_prime.evidence.created

codex.entity.discovered → codex.entity.restored → codex.entity.bound
  ↓                                                    ↓
pattern.weave.response ← pattern.taxonomy.updated     babel.embedding.response
  ↓                                                    ↓
memory.coherence.checked → memory.health.checked → memory.sync.completed
  ↓                                                    ↓
orthodoxy.audit.completed ────────────────────────→ vault.archive.completed

                                            vault.snapshot.created

                                            vault.integrity.validated

🚀 Migration & Compatibility

Event Naming Migration (v1 → v2)

  • v1 Legacy: intake.evidence.created
  • v2 Canonical: oculus_prime.evidence.created
  • Migration Mode: Controlled by OCULUS_PRIME_EVENT_MIGRATION_MODE env var
    • v1_only — Emit only legacy channel
    • v2_only — Emit only canonical channel
    • dual_write — Emit both (default for gradual rollout)

Consumer Configuration

Codex Hunters listeners support:

  • OCULUS_PRIME_EVENT_MIGRATION_MODE — Which channel(s) Oculus Prime emits
  • CODEX_OCULUS_CONSUME_LEGACY_ALIAS — Whether to consume legacy intake.* channel

📚 References

  • Oculus Prime Implementation: /infrastructure/edge/oculus_prime/
  • Event Emitter: /infrastructure/edge/oculus_prime/core/event_emitter.py
  • Codex Hunters Consumer: /services/api_codex_hunters/streams_listener.py
  • Pattern Weavers Consumer: /services/api_pattern_weavers/adapters/bus_adapter.py
  • Memory Orders Coherence: /services/api_memory_orders/adapters/bus_adapter.py
  • Vault Keepers Archival: /services/api_vault_keepers/adapters/bus_adapter.py
  • StreamBus Transport: /vitruvyan_core/core/synaptic_conclave/transport/streams.py