diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 728d559..1aa588e 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -57,10 +57,15 @@ async fn main() { store: store.clone(), registry: registry.clone(), })) - .nest("/vectors", vectord::service::router(vectord::service::VectorState { - store: store.clone(), - ai_client: ai_client.clone(), - job_tracker: vectord::jobs::JobTracker::new(), + .nest("/vectors", vectord::service::router({ + let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); + let _ = index_reg.rebuild().await; + vectord::service::VectorState { + store: store.clone(), + ai_client: ai_client.clone(), + job_tracker: vectord::jobs::JobTracker::new(), + index_registry: index_reg, + } })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) .nest("/journal", journald::service::router(journal)); diff --git a/crates/vectord/src/index_registry.rs b/crates/vectord/src/index_registry.rs new file mode 100644 index 0000000..7bc3367 --- /dev/null +++ b/crates/vectord/src/index_registry.rs @@ -0,0 +1,112 @@ +/// Vector index registry — tracks all indexes with model versioning. +/// Each index knows which model created it, enabling: +/// - Multi-version indexes (same data, different models, coexist) +/// - Incremental re-embed (only new/changed docs on model upgrade) +/// - A/B search comparison between model versions + +use chrono::{DateTime, Utc}; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use storaged::ops; + +/// Metadata for a vector index. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IndexMeta { + pub index_name: String, + pub source: String, // dataset this was built from + pub model_name: String, // "nomic-embed-text" + pub model_version: String, // "latest" or specific version + pub dimensions: u32, // 768 + pub chunk_count: usize, + pub doc_count: usize, + pub chunk_size: usize, + pub overlap: usize, + pub storage_key: String, // "vectors/resumes_v1_nomic.parquet" + pub created_at: DateTime, + pub build_time_secs: f32, + pub chunks_per_sec: f32, +} + +/// Registry of all vector indexes. +#[derive(Clone)] +pub struct IndexRegistry { + indexes: Arc>>, + store: Arc, +} + +impl IndexRegistry { + pub fn new(store: Arc) -> Self { + Self { + indexes: Arc::new(RwLock::new(HashMap::new())), + store, + } + } + + /// Rebuild from persisted index metadata on startup. + pub async fn rebuild(&self) -> Result { + let keys = ops::list(&self.store, Some("vectors/meta/")).await?; + let mut reg = self.indexes.write().await; + reg.clear(); + + for key in &keys { + if !key.ends_with(".json") { continue; } + let data = ops::get(&self.store, key).await?; + match serde_json::from_slice::(&data) { + Ok(meta) => { reg.insert(meta.index_name.clone(), meta); } + Err(e) => tracing::warn!("failed to load index meta {key}: {e}"), + } + } + + let count = reg.len(); + if count > 0 { + tracing::info!("loaded {count} vector index metadata entries"); + } + Ok(count) + } + + /// Register a new index. + pub async fn register(&self, meta: IndexMeta) -> Result<(), String> { + let key = format!("vectors/meta/{}.json", meta.index_name); + let json = serde_json::to_vec_pretty(&meta).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + self.indexes.write().await.insert(meta.index_name.clone(), meta); + Ok(()) + } + + /// Get metadata for an index. + pub async fn get(&self, index_name: &str) -> Option { + self.indexes.read().await.get(index_name).cloned() + } + + /// List all indexes, optionally filtered by source or model. + pub async fn list(&self, source: Option<&str>, model: Option<&str>) -> Vec { + self.indexes.read().await.values() + .filter(|m| source.map_or(true, |s| m.source == s)) + .filter(|m| model.map_or(true, |mo| m.model_name == mo)) + .cloned() + .collect() + } + + /// Find all versions of an index for a given source dataset. + /// Returns indexes sorted by creation time (newest first). + pub async fn versions_for_source(&self, source: &str) -> Vec { + let mut versions: Vec = self.indexes.read().await.values() + .filter(|m| m.source == source) + .cloned() + .collect(); + versions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + versions + } + + /// Delete an index (metadata only — vector Parquet stays for safety). + pub async fn delete(&self, index_name: &str) -> Result<(), String> { + let key = format!("vectors/meta/{index_name}.json"); + ops::delete(&self.store, &key).await?; + self.indexes.write().await.remove(index_name); + Ok(()) + } +} diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index 4c138f3..943e2b5 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -1,4 +1,5 @@ pub mod chunker; +pub mod index_registry; pub mod jobs; pub mod store; pub mod search; diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index f193202..2734ff0 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -1,6 +1,6 @@ use axum::{ Json, Router, - extract::{Path, State}, + extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, @@ -10,19 +10,22 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; -use crate::{chunker, jobs, rag, search, store, supervisor}; +use crate::{chunker, index_registry, jobs, rag, search, store, supervisor}; #[derive(Clone)] pub struct VectorState { pub store: Arc, pub ai_client: AiClient, pub job_tracker: jobs::JobTracker, + pub index_registry: index_registry::IndexRegistry, } pub fn router(state: VectorState) -> Router { Router::new() .route("/health", get(health)) .route("/index", post(create_index)) + .route("/indexes", get(list_indexes)) + .route("/indexes/{name}", get(get_index_meta)) .route("/jobs", get(list_jobs)) .route("/jobs/{id}", get(get_job)) .route("/search", post(search_index)) @@ -88,17 +91,42 @@ async fn create_index( let tracker = state.job_tracker.clone(); let ai_client = state.ai_client.clone(); let obj_store = state.store.clone(); + let registry = state.index_registry.clone(); let jid = job_id.clone(); + let source_name = req.source.clone(); + let idx_name = req.index_name.clone(); tokio::spawn(async move { + let start_time = std::time::Instant::now(); let config = supervisor::SupervisorConfig::default(); let result = supervisor::run_supervised( - &jid, &index_name, chunks, &ai_client, &obj_store, &tracker, config, + &jid, &idx_name, chunks, &ai_client, &obj_store, &tracker, config, ).await; match result { Ok(key) => { + let elapsed = start_time.elapsed().as_secs_f32(); + let rate = if elapsed > 0.0 { n_chunks as f32 / elapsed } else { 0.0 }; + + // Register index metadata with model version info + let meta = index_registry::IndexMeta { + index_name: idx_name.clone(), + source: source_name, + model_name: "nomic-embed-text".to_string(), // from sidecar config + model_version: "latest".to_string(), + dimensions: 768, + chunk_count: n_chunks, + doc_count: n_docs, + chunk_size: chunk_size, + overlap: overlap, + storage_key: key.clone(), + created_at: chrono::Utc::now(), + build_time_secs: elapsed, + chunks_per_sec: rate, + }; + let _ = registry.register(meta).await; + tracker.complete(&jid, key).await; - tracing::info!("job {jid}: completed"); + tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)"); } Err(e) => { tracker.fail(&jid, e.clone()).await; @@ -116,8 +144,37 @@ async fn create_index( }))) } -/// Run the actual embedding work in background. -async fn run_embedding_job( +// --- Index Registry --- + +#[derive(Deserialize)] +struct IndexListQuery { + source: Option, + model: Option, +} + +async fn list_indexes( + State(state): State, + Query(q): Query, +) -> impl IntoResponse { + let indexes = state.index_registry.list(q.source.as_deref(), q.model.as_deref()).await; + Json(indexes) +} + +async fn get_index_meta( + State(state): State, + Path(name): Path, +) -> impl IntoResponse { + match state.index_registry.get(&name).await { + Some(meta) => Ok(Json(meta)), + None => Err((StatusCode::NOT_FOUND, format!("index not found: {name}"))), + } +} + +// --- unused legacy function below, kept for reference --- + +#[allow(dead_code)] +/// Legacy single-pipeline embedding (replaced by supervisor). +async fn _run_embedding_job_legacy( job_id: &str, index_name: &str, chunks: &[chunker::TextChunk],