Phase 11: Embedding versioning — model-proof vector layer
- IndexRegistry: tracks all vector indexes with model metadata
(model_name, model_version, dimensions, build stats)
- Index metadata persisted as JSON in vectors/meta/
- Rebuilt on startup for crash recovery
- GET /vectors/indexes — list all indexes (filter by source/model)
- GET /vectors/indexes/{name} — get index metadata
- Background jobs auto-register metadata on completion
- Multi-version support: same data, different models, coexist
- Per ADR-014: enables incremental re-embed on model upgrade
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6d49f81ebf
commit
6cd1daeb51
@ -57,10 +57,15 @@ async fn main() {
|
|||||||
store: store.clone(),
|
store: store.clone(),
|
||||||
registry: registry.clone(),
|
registry: registry.clone(),
|
||||||
}))
|
}))
|
||||||
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
|
.nest("/vectors", vectord::service::router({
|
||||||
store: store.clone(),
|
let index_reg = vectord::index_registry::IndexRegistry::new(store.clone());
|
||||||
ai_client: ai_client.clone(),
|
let _ = index_reg.rebuild().await;
|
||||||
job_tracker: vectord::jobs::JobTracker::new(),
|
vectord::service::VectorState {
|
||||||
|
store: store.clone(),
|
||||||
|
ai_client: ai_client.clone(),
|
||||||
|
job_tracker: vectord::jobs::JobTracker::new(),
|
||||||
|
index_registry: index_reg,
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
||||||
.nest("/journal", journald::service::router(journal));
|
.nest("/journal", journald::service::router(journal));
|
||||||
|
|||||||
112
crates/vectord/src/index_registry.rs
Normal file
112
crates/vectord/src/index_registry.rs
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
/// Vector index registry — tracks all indexes with model versioning.
|
||||||
|
/// Each index knows which model created it, enabling:
|
||||||
|
/// - Multi-version indexes (same data, different models, coexist)
|
||||||
|
/// - Incremental re-embed (only new/changed docs on model upgrade)
|
||||||
|
/// - A/B search comparison between model versions
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
use storaged::ops;
|
||||||
|
|
||||||
|
/// Metadata for a vector index.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct IndexMeta {
|
||||||
|
pub index_name: String,
|
||||||
|
pub source: String, // dataset this was built from
|
||||||
|
pub model_name: String, // "nomic-embed-text"
|
||||||
|
pub model_version: String, // "latest" or specific version
|
||||||
|
pub dimensions: u32, // 768
|
||||||
|
pub chunk_count: usize,
|
||||||
|
pub doc_count: usize,
|
||||||
|
pub chunk_size: usize,
|
||||||
|
pub overlap: usize,
|
||||||
|
pub storage_key: String, // "vectors/resumes_v1_nomic.parquet"
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub build_time_secs: f32,
|
||||||
|
pub chunks_per_sec: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registry of all vector indexes.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct IndexRegistry {
|
||||||
|
indexes: Arc<RwLock<HashMap<String, IndexMeta>>>,
|
||||||
|
store: Arc<dyn ObjectStore>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexRegistry {
|
||||||
|
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
|
||||||
|
Self {
|
||||||
|
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rebuild from persisted index metadata on startup.
|
||||||
|
pub async fn rebuild(&self) -> Result<usize, String> {
|
||||||
|
let keys = ops::list(&self.store, Some("vectors/meta/")).await?;
|
||||||
|
let mut reg = self.indexes.write().await;
|
||||||
|
reg.clear();
|
||||||
|
|
||||||
|
for key in &keys {
|
||||||
|
if !key.ends_with(".json") { continue; }
|
||||||
|
let data = ops::get(&self.store, key).await?;
|
||||||
|
match serde_json::from_slice::<IndexMeta>(&data) {
|
||||||
|
Ok(meta) => { reg.insert(meta.index_name.clone(), meta); }
|
||||||
|
Err(e) => tracing::warn!("failed to load index meta {key}: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = reg.len();
|
||||||
|
if count > 0 {
|
||||||
|
tracing::info!("loaded {count} vector index metadata entries");
|
||||||
|
}
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a new index.
|
||||||
|
pub async fn register(&self, meta: IndexMeta) -> Result<(), String> {
|
||||||
|
let key = format!("vectors/meta/{}.json", meta.index_name);
|
||||||
|
let json = serde_json::to_vec_pretty(&meta).map_err(|e| e.to_string())?;
|
||||||
|
ops::put(&self.store, &key, json.into()).await?;
|
||||||
|
self.indexes.write().await.insert(meta.index_name.clone(), meta);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get metadata for an index.
|
||||||
|
pub async fn get(&self, index_name: &str) -> Option<IndexMeta> {
|
||||||
|
self.indexes.read().await.get(index_name).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all indexes, optionally filtered by source or model.
|
||||||
|
pub async fn list(&self, source: Option<&str>, model: Option<&str>) -> Vec<IndexMeta> {
|
||||||
|
self.indexes.read().await.values()
|
||||||
|
.filter(|m| source.map_or(true, |s| m.source == s))
|
||||||
|
.filter(|m| model.map_or(true, |mo| m.model_name == mo))
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find all versions of an index for a given source dataset.
|
||||||
|
/// Returns indexes sorted by creation time (newest first).
|
||||||
|
pub async fn versions_for_source(&self, source: &str) -> Vec<IndexMeta> {
|
||||||
|
let mut versions: Vec<IndexMeta> = self.indexes.read().await.values()
|
||||||
|
.filter(|m| m.source == source)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
versions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
|
||||||
|
versions
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete an index (metadata only — vector Parquet stays for safety).
|
||||||
|
pub async fn delete(&self, index_name: &str) -> Result<(), String> {
|
||||||
|
let key = format!("vectors/meta/{index_name}.json");
|
||||||
|
ops::delete(&self.store, &key).await?;
|
||||||
|
self.indexes.write().await.remove(index_name);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,5 @@
|
|||||||
pub mod chunker;
|
pub mod chunker;
|
||||||
|
pub mod index_registry;
|
||||||
pub mod jobs;
|
pub mod jobs;
|
||||||
pub mod store;
|
pub mod store;
|
||||||
pub mod search;
|
pub mod search;
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
Json, Router,
|
Json, Router,
|
||||||
extract::{Path, State},
|
extract::{Path, Query, State},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
@ -10,19 +10,22 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use aibridge::client::{AiClient, EmbedRequest};
|
use aibridge::client::{AiClient, EmbedRequest};
|
||||||
use crate::{chunker, jobs, rag, search, store, supervisor};
|
use crate::{chunker, index_registry, jobs, rag, search, store, supervisor};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct VectorState {
|
pub struct VectorState {
|
||||||
pub store: Arc<dyn ObjectStore>,
|
pub store: Arc<dyn ObjectStore>,
|
||||||
pub ai_client: AiClient,
|
pub ai_client: AiClient,
|
||||||
pub job_tracker: jobs::JobTracker,
|
pub job_tracker: jobs::JobTracker,
|
||||||
|
pub index_registry: index_registry::IndexRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn router(state: VectorState) -> Router {
|
pub fn router(state: VectorState) -> Router {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/health", get(health))
|
.route("/health", get(health))
|
||||||
.route("/index", post(create_index))
|
.route("/index", post(create_index))
|
||||||
|
.route("/indexes", get(list_indexes))
|
||||||
|
.route("/indexes/{name}", get(get_index_meta))
|
||||||
.route("/jobs", get(list_jobs))
|
.route("/jobs", get(list_jobs))
|
||||||
.route("/jobs/{id}", get(get_job))
|
.route("/jobs/{id}", get(get_job))
|
||||||
.route("/search", post(search_index))
|
.route("/search", post(search_index))
|
||||||
@ -88,17 +91,42 @@ async fn create_index(
|
|||||||
let tracker = state.job_tracker.clone();
|
let tracker = state.job_tracker.clone();
|
||||||
let ai_client = state.ai_client.clone();
|
let ai_client = state.ai_client.clone();
|
||||||
let obj_store = state.store.clone();
|
let obj_store = state.store.clone();
|
||||||
|
let registry = state.index_registry.clone();
|
||||||
let jid = job_id.clone();
|
let jid = job_id.clone();
|
||||||
|
let source_name = req.source.clone();
|
||||||
|
let idx_name = req.index_name.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let start_time = std::time::Instant::now();
|
||||||
let config = supervisor::SupervisorConfig::default();
|
let config = supervisor::SupervisorConfig::default();
|
||||||
let result = supervisor::run_supervised(
|
let result = supervisor::run_supervised(
|
||||||
&jid, &index_name, chunks, &ai_client, &obj_store, &tracker, config,
|
&jid, &idx_name, chunks, &ai_client, &obj_store, &tracker, config,
|
||||||
).await;
|
).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(key) => {
|
Ok(key) => {
|
||||||
|
let elapsed = start_time.elapsed().as_secs_f32();
|
||||||
|
let rate = if elapsed > 0.0 { n_chunks as f32 / elapsed } else { 0.0 };
|
||||||
|
|
||||||
|
// Register index metadata with model version info
|
||||||
|
let meta = index_registry::IndexMeta {
|
||||||
|
index_name: idx_name.clone(),
|
||||||
|
source: source_name,
|
||||||
|
model_name: "nomic-embed-text".to_string(), // from sidecar config
|
||||||
|
model_version: "latest".to_string(),
|
||||||
|
dimensions: 768,
|
||||||
|
chunk_count: n_chunks,
|
||||||
|
doc_count: n_docs,
|
||||||
|
chunk_size: chunk_size,
|
||||||
|
overlap: overlap,
|
||||||
|
storage_key: key.clone(),
|
||||||
|
created_at: chrono::Utc::now(),
|
||||||
|
build_time_secs: elapsed,
|
||||||
|
chunks_per_sec: rate,
|
||||||
|
};
|
||||||
|
let _ = registry.register(meta).await;
|
||||||
|
|
||||||
tracker.complete(&jid, key).await;
|
tracker.complete(&jid, key).await;
|
||||||
tracing::info!("job {jid}: completed");
|
tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracker.fail(&jid, e.clone()).await;
|
tracker.fail(&jid, e.clone()).await;
|
||||||
@ -116,8 +144,37 @@ async fn create_index(
|
|||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the actual embedding work in background.
|
// --- Index Registry ---
|
||||||
async fn run_embedding_job(
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct IndexListQuery {
|
||||||
|
source: Option<String>,
|
||||||
|
model: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_indexes(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Query(q): Query<IndexListQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let indexes = state.index_registry.list(q.source.as_deref(), q.model.as_deref()).await;
|
||||||
|
Json(indexes)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_index_meta(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Path(name): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match state.index_registry.get(&name).await {
|
||||||
|
Some(meta) => Ok(Json(meta)),
|
||||||
|
None => Err((StatusCode::NOT_FOUND, format!("index not found: {name}"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- unused legacy function below, kept for reference ---
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
/// Legacy single-pipeline embedding (replaced by supervisor).
|
||||||
|
async fn _run_embedding_job_legacy(
|
||||||
job_id: &str,
|
job_id: &str,
|
||||||
index_name: &str,
|
index_name: &str,
|
||||||
chunks: &[chunker::TextChunk],
|
chunks: &[chunker::TextChunk],
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user