Vitruvyan Docs
🌐 Oculus Prime Flow Graph (Streams-Native)
Updated: February 20, 2026
Context: Complete end-to-end pipeline with all Sacred Orders
Scope: Full epistemic flow from acquisition to archival
🎯 Purpose
This document describes the canonical Oculus Prime flow after migration to Redis Streams.
Key constraints:
- Oculus Prime is the single edge gateway for H2M/M2M ingestion.
- Oculus Prime performs acquisition, normalization, and immutable persistence.
- Oculus Prime emits
oculus_prime.evidence.createdvia StreamBus (legacy alias supported). - Semantic enrichment starts downstream (Codex Hunters and beyond).
🔄 End-to-End Flow
Responsibility Boundary
Oculus Prime MUST:
- Create immutable Evidence Packs.
- Emit stream events and audit logs.
- Stay media-agnostic and domain-agnostic in acquisition behavior.
Oculus Prime MUST NOT:
- Run NER, embeddings, ontology mapping.
- Decide semantic relevance.
- Call downstream cognitive services directly.
- Read downstream cognitive tables.
📊 Pipeline Legend
Color Coding by Sacred Order
- 🟢 Green (Perception): Oculus Prime + Codex Hunters — Acquisition & Discovery
- 🔵 Blue (Reason): Pattern Weavers — Ontology & Classification
- 🟣 Purple (Memory): Memory Orders — Coherence & Sync
- 🟠 Orange (Truth): Orthodoxy Wardens + Vault Keepers — Governance & Archival
- 🔵 Cyan (Discourse): Babel Gardens — Language Processing (parallel)
Component Types
- 🗄️ Dark Blue Cylinders: PostgreSQL (structured storage)
- 🔴 Red Cylinders: Qdrant (vector embeddings)
- 🔴 Red Boxes: Redis Streams (StreamBus event channels)
🎯 Detailed Pipeline Flow
1️⃣ PERCEPTION Layer: Oculus Prime (Intake)
Responsibilities:
- Accept file uploads from external sources (documents, images, audio, video, geo, CAD)
- Extract literal text (NO semantic interpretation)
- Create immutable Evidence Packs
- Emit standardized events
PostgreSQL Tables:
evidence_packs— Append-only evidence storageintake_event_log— Audit trail of all emitted eventsintake_event_failures— Failed emission diagnostics
StreamBus Events Emitted:
oculus_prime.evidence.created(v2 canonical)intake.evidence.created(v1 legacy alias, dual-write mode)
Event Payload:
2️⃣ PERCEPTION → MEMORY: Codex Hunters (Discovery & Enrichment)
Responsibilities:
- Consume evidence created events
- Perform discovery (entity recognition)
- Restore data (normalization)
- Bind entities (relationship linking)
- Generate embeddings for semantic search
Consumer Group: codex_hunters
Consumed Events:
oculus_prime.evidence.created(primary)intake.evidence.created(legacy fallback)
Processing Pipeline:
- TrackerConsumer — Discovery pipeline (entity detection)
- RestorerConsumer — Data normalization & quality assessment
- BinderConsumer — Entity relationship binding
PostgreSQL Tables:
entities— Discovered entities with metadataentity_metadata— Additional entity attributes
Qdrant Collections:
codex_entities— Vector embeddings for entities (via Embedding API)
StreamBus Events Emitted:
codex.entity.discovered— New entity foundcodex.entity.restored— Data normalizedcodex.entity.bound— Relationships established
3️⃣ REASON Layer: Pattern Weavers (Ontology Mapping)
Responsibilities:
- Map entities to domain ontologies
- Extract concepts and relationships
- Classify entities into taxonomies
- Enable semantic pattern matching
Consumer Group: pattern_weavers
Consumed Events:
codex.entity.discoveredcodex.entity.bound
Processing Pipeline:
- WeaverConsumer — Ontology mapping & concept extraction
- KeywordMatcherConsumer — Pattern classification & keyword matching
PostgreSQL Tables:
patterns— Detected patterns and relationshipsontology_mappings— Entity-to-ontology bindings
Qdrant Collections:
pattern_taxonomy— Vector embeddings for patterns (via Embedding API)
StreamBus Events Emitted:
pattern.weave.response— Pattern weaving completedpattern.taxonomy.updated— Taxonomy changes
4️⃣ DISCOURSE Layer: Babel Gardens (Language Processing — Parallel)
Responsibilities:
- Language detection and translation
- Emotion and sentiment analysis
- Topic classification
- Linguistic synthesis
Consumer Group: babel_gardens
Consumed Events (optional, parallel to Pattern Weavers):
codex.entity.discovered
Processing Pipeline:
- SynthesisConsumer — Language processing & synthesis
- TopicClassifierConsumer — Topic extraction
- LanguageDetectorConsumer — Language identification
Qdrant Collections:
babel_embeddings— Linguistic embeddings for multilingual semantic search
StreamBus Events Emitted:
babel.embedding.response— Embedding generation completedbabel.emotion.response— Emotion analysis resultbabel.topic.response— Topic classification result
5️⃣ MEMORY Layer: Memory Orders (Coherence Monitor)
Responsibilities:
- Monitor dual-memory coherence (PostgreSQL ↔ Qdrant)
- Detect drift and synchronization issues
- Aggregate system health metrics
- Plan and execute sync operations
Processing Pipeline:
- CoherenceAnalyzer — Compare PostgreSQL vs Qdrant counts
- HealthAggregator — Collect health metrics from all components
- SyncPlanner — Generate drift resolution plans
PostgreSQL Tables:
coherence_reports— Coherence check audit trailhealth_metrics— System health snapshots
StreamBus Events Emitted:
memory.coherence.checked— Coherence check completedmemory.health.checked— System health reportmemory.sync.completed— Sync operation finished
Event Payload Example:
6️⃣ TRUTH Layer: Orthodoxy Wardens (Governance)
Responsibilities:
- Data quality auditing
- Policy compliance validation
- Schema validation
- Integrity enforcement
Processing Pipeline:
- QualityAuditor — Assess data quality metrics
- ComplianceValidator — Enforce governance policies
PostgreSQL Tables:
audit_logs— Comprehensive audit trailcompliance_reports— Policy violation tracking
StreamBus Events Emitted:
orthodoxy.audit.completed— Audit finished (consumed by Vault Keepers)
7️⃣ TRUTH Layer: Vault Keepers (Archival & Persistence)
Responsibilities:
- Archive evidence and enriched metadata
- Create state snapshots
- Verify data integrity (hash validation)
- Manage backup and recovery
Consumer Group: vault_keepers
Consumed Events:
orthodoxy.audit.completed— Audit events for archivalmemory.sync.completed— Sync events for snapshot triggers
Processing Pipeline:
- ArchiveConsumer — Evidence archival
- SnapshotConsumer — State snapshot creation
- IntegrityConsumer — Hash verification & integrity checks
PostgreSQL Tables:
vault_archives— Archived evidence and metadatasnapshots— Point-in-time state snapshotsintegrity_checks— Hash validation records
StreamBus Events Emitted:
vault.archive.completed— Archival finishedvault.snapshot.created— Snapshot savedvault.integrity.validated— Integrity check passed
🔑 Key Integration Points
Who Embeds to Qdrant?
- Codex Hunters →
codex_entitiescollection (via Embedding API) - Pattern Weavers →
pattern_taxonomycollection (via Embedding API) - Babel Gardens →
babel_embeddingscollection (direct QdrantAgent)
Who Logs to PostgreSQL?
- Oculus Prime →
evidence_packs,intake_event_log,intake_event_failures - Codex Hunters →
entities,entity_metadata,entity_relations(ontological edges from FK data) - Pattern Weavers →
patterns,ontology_mappings,entity_relations(ontological edges from LLM) - Memory Orders →
coherence_reports,health_metrics - Orthodoxy Wardens →
audit_logs,compliance_reports - Vault Keepers →
vault_archives,snapshots,integrity_checks
Event Flow Summary
🚀 Migration & Compatibility
Event Naming Migration (v1 → v2)
- v1 Legacy:
intake.evidence.created - v2 Canonical:
oculus_prime.evidence.created - Migration Mode: Controlled by
OCULUS_PRIME_EVENT_MIGRATION_MODEenv varv1_only— Emit only legacy channelv2_only— Emit only canonical channeldual_write— Emit both (default for gradual rollout)
Consumer Configuration
Codex Hunters listeners support:
OCULUS_PRIME_EVENT_MIGRATION_MODE— Which channel(s) Oculus Prime emitsCODEX_OCULUS_CONSUME_LEGACY_ALIAS— Whether to consume legacyintake.*channel
📚 References
- Oculus Prime Implementation:
/infrastructure/edge/oculus_prime/ - Event Emitter:
/infrastructure/edge/oculus_prime/core/event_emitter.py - Codex Hunters Consumer:
/services/api_codex_hunters/streams_listener.py - Pattern Weavers Consumer:
/services/api_pattern_weavers/adapters/bus_adapter.py - Memory Orders Coherence:
/services/api_memory_orders/adapters/bus_adapter.py - Vault Keepers Archival:
/services/api_vault_keepers/adapters/bus_adapter.py - StreamBus Transport:
/vitruvyan_core/core/synaptic_conclave/transport/streams.py