lakehouse/crates/vectord/src/index_registry.rs
root 937569d188 ADR-020: Universal ID mapping — fix the flat embedding identity problem
THE REAL PROBLEM: Every new data source produces different doc_id
prefixes in vector indexes (W-, W500K-, W5K-, CAND-). Hybrid search
had to hardcode strip_prefix for each one. New datasets broke hybrid
until someone added another prefix. This violates "any data source
without pre-defined schemas."

THE FIX: IndexMeta.id_prefix — the catalog records what prefix each
index uses. Hybrid search reads it and strips automatically. Legacy
indexes fall back to heuristic stripping. New indexes can set
id_prefix=None to use raw IDs (no prefix, no stripping needed).

This means: ingest a new dataset, embed it, hybrid search works
immediately without code changes. The system is truly source-agnostic.

Also: full ADR document at docs/ADR-020-universal-id-mapping.md
with the three options considered and rationale for the chosen approach.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-17 11:58:18 -05:00

132 lines
4.9 KiB
Rust

/// Vector index registry — tracks all indexes with model versioning.
/// Each index knows which model created it, enabling:
/// - Multi-version indexes (same data, different models, coexist)
/// - Incremental re-embed (only new/changed docs on model upgrade)
/// - A/B search comparison between model versions
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::ops;
/// Metadata for a vector index.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexMeta {
pub index_name: String,
pub source: String, // dataset this was built from
pub model_name: String, // "nomic-embed-text"
pub model_version: String, // "latest" or specific version
pub dimensions: u32, // 768
pub chunk_count: usize,
pub doc_count: usize,
pub chunk_size: usize,
pub overlap: usize,
pub storage_key: String, // "vectors/resumes_v1_nomic.parquet"
pub created_at: DateTime<Utc>,
pub build_time_secs: f32,
pub chunks_per_sec: f32,
/// Federation layer 2: which bucket holds this index's artifacts
/// (trial journal + promotion file). Defaults to "primary" for
/// pre-federation indexes — the serde default keeps old metadata
/// files readable without migration.
#[serde(default = "default_bucket")]
pub bucket: String,
/// ADR-019: which physical backend stores this index. `Parquet`
/// means storage_key points at our binary-blob Parquet file;
/// `Lance` means it points at a Lance dataset directory.
#[serde(default)]
pub vector_backend: shared::types::VectorBackend,
/// ADR-020: prefix prepended to doc_ids during embedding. If set,
/// hybrid search strips this prefix to match against SQL primary keys.
/// None = doc_ids ARE the raw primary keys (no stripping needed).
/// Existing indexes: "W-", "CAND-", "W500K-", etc.
#[serde(default)]
pub id_prefix: Option<String>,
}
fn default_bucket() -> String { "primary".to_string() }
/// Registry of all vector indexes.
#[derive(Clone)]
pub struct IndexRegistry {
indexes: Arc<RwLock<HashMap<String, IndexMeta>>>,
store: Arc<dyn ObjectStore>,
}
impl IndexRegistry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
indexes: Arc::new(RwLock::new(HashMap::new())),
store,
}
}
/// Rebuild from persisted index metadata on startup.
pub async fn rebuild(&self) -> Result<usize, String> {
let keys = ops::list(&self.store, Some("vectors/meta/")).await?;
let mut reg = self.indexes.write().await;
reg.clear();
for key in &keys {
if !key.ends_with(".json") { continue; }
let data = ops::get(&self.store, key).await?;
match serde_json::from_slice::<IndexMeta>(&data) {
Ok(meta) => { reg.insert(meta.index_name.clone(), meta); }
Err(e) => tracing::warn!("failed to load index meta {key}: {e}"),
}
}
let count = reg.len();
if count > 0 {
tracing::info!("loaded {count} vector index metadata entries");
}
Ok(count)
}
/// Register a new index.
pub async fn register(&self, meta: IndexMeta) -> Result<(), String> {
let key = format!("vectors/meta/{}.json", meta.index_name);
let json = serde_json::to_vec_pretty(&meta).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
self.indexes.write().await.insert(meta.index_name.clone(), meta);
Ok(())
}
/// Get metadata for an index.
pub async fn get(&self, index_name: &str) -> Option<IndexMeta> {
self.indexes.read().await.get(index_name).cloned()
}
/// List all indexes, optionally filtered by source or model.
pub async fn list(&self, source: Option<&str>, model: Option<&str>) -> Vec<IndexMeta> {
self.indexes.read().await.values()
.filter(|m| source.map_or(true, |s| m.source == s))
.filter(|m| model.map_or(true, |mo| m.model_name == mo))
.cloned()
.collect()
}
/// Find all versions of an index for a given source dataset.
/// Returns indexes sorted by creation time (newest first).
pub async fn versions_for_source(&self, source: &str) -> Vec<IndexMeta> {
let mut versions: Vec<IndexMeta> = self.indexes.read().await.values()
.filter(|m| m.source == source)
.cloned()
.collect();
versions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
versions
}
/// Delete an index (metadata only — vector Parquet stays for safety).
pub async fn delete(&self, index_name: &str) -> Result<(), String> {
let key = format!("vectors/meta/{index_name}.json");
ops::delete(&self.store, &key).await?;
self.indexes.write().await.remove(index_name);
Ok(())
}
}