diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index d218714..92922f0 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -1,6 +1,6 @@ use shared::types::{ - AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ObjectRef, - RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone, + AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile, + ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone, }; use crate::tombstones::TombstoneStore; @@ -45,6 +45,7 @@ pub struct MetadataUpdate { const MANIFEST_PREFIX: &str = "_catalog/manifests"; const VIEW_PREFIX: &str = "_catalog/views"; +const PROFILE_PREFIX: &str = "_catalog/profiles"; /// In-memory dataset registry backed by manifest persistence in object storage. /// Also tracks AiViews (Phase D) — safe projections over base datasets. @@ -53,6 +54,7 @@ const VIEW_PREFIX: &str = "_catalog/views"; pub struct Registry { datasets: Arc>>, views: Arc>>, + profiles: Arc>>, tombstones: TombstoneStore, store: Arc, } @@ -62,6 +64,7 @@ impl Registry { Self { datasets: Arc::new(RwLock::new(HashMap::new())), views: Arc::new(RwLock::new(HashMap::new())), + profiles: Arc::new(RwLock::new(HashMap::new())), tombstones: TombstoneStore::new(store.clone()), store, } @@ -136,9 +139,79 @@ impl Registry { tracing::info!("catalog: {} views loaded", views.len()); } + // Phase 17: load model profiles. + let profile_keys = ops::list(&self.store, Some(PROFILE_PREFIX)).await.unwrap_or_default(); + let mut profiles = self.profiles.write().await; + profiles.clear(); + for key in &profile_keys { + if !key.ends_with(".json") { continue; } + let data = match ops::get(&self.store, key).await { + Ok(d) => d, + Err(e) => { tracing::warn!("profile '{key}': read failed: {e}"); continue; } + }; + match serde_json::from_slice::(&data) { + Ok(p) => { profiles.insert(p.id.clone(), p); } + Err(e) => tracing::warn!("profile '{key}': parse failed: {e}"), + } + } + if !profiles.is_empty() { + tracing::info!("catalog: {} model profiles loaded", profiles.len()); + } + Ok(count) } + // --- Phase 17: Model profiles --- + + /// Create or replace a model profile. Validates id slug + ensures + /// every bound_dataset exists (as either a raw dataset or an AiView). + pub async fn put_profile(&self, profile: ModelProfile) -> Result { + if profile.id.is_empty() { + return Err("profile id is empty".into()); + } + if !profile.id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { + return Err(format!( + "profile id '{}' must be alphanumeric + '-' or '_' only", profile.id, + )); + } + for binding in &profile.bound_datasets { + let exists = self.get_by_name(binding).await.is_some() + || self.get_view(binding).await.is_some(); + if !exists { + return Err(format!( + "bound dataset '{}' not found as dataset or view", binding, + )); + } + } + + let key = format!("{PROFILE_PREFIX}/{}.json", profile.id); + let json = serde_json::to_vec_pretty(&profile).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + + let mut profiles = self.profiles.write().await; + profiles.insert(profile.id.clone(), profile.clone()); + tracing::info!( + "profile registered: {} -> ollama={} bindings={:?}", + profile.id, profile.ollama_name, profile.bound_datasets, + ); + Ok(profile) + } + + pub async fn get_profile(&self, id: &str) -> Option { + self.profiles.read().await.get(id).cloned() + } + + pub async fn list_profiles(&self) -> Vec { + self.profiles.read().await.values().cloned().collect() + } + + pub async fn delete_profile(&self, id: &str) -> Result<(), String> { + let key = format!("{PROFILE_PREFIX}/{id}.json"); + ops::delete(&self.store, &key).await?; + self.profiles.write().await.remove(id); + Ok(()) + } + /// Register a new dataset. Persists manifest to storage before updating memory. pub async fn register( &self, diff --git a/crates/catalogd/src/service.rs b/crates/catalogd/src/service.rs index f0fb737..52854e1 100644 --- a/crates/catalogd/src/service.rs +++ b/crates/catalogd/src/service.rs @@ -27,6 +27,9 @@ pub fn router(registry: Registry) -> Router { .route("/views/{name}", get(get_view).delete(delete_view)) // Phase E: soft-delete tombstones .route("/datasets/by-name/{name}/tombstone", post(tombstone_rows).get(list_tombstones)) + // Phase 17: model profiles + .route("/profiles", post(create_profile).get(list_profiles)) + .route("/profiles/{id}", get(get_profile).delete(delete_profile)) .with_state(registry) } @@ -338,3 +341,66 @@ async fn list_tombstones( Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } + +// --- Phase 17: Model profiles --- + +#[derive(Deserialize)] +struct CreateProfileRequest { + id: String, + ollama_name: String, + #[serde(default)] + description: String, + bound_datasets: Vec, + #[serde(default)] + hnsw_config: shared::types::ProfileHnswConfig, + #[serde(default = "default_embed_model_req")] + embed_model: String, + #[serde(default)] + created_by: String, +} + +fn default_embed_model_req() -> String { "nomic-embed-text".to_string() } + +async fn create_profile( + State(registry): State, + Json(req): Json, +) -> impl IntoResponse { + let profile = shared::types::ModelProfile { + id: req.id, + ollama_name: req.ollama_name, + description: req.description, + bound_datasets: req.bound_datasets, + hnsw_config: req.hnsw_config, + embed_model: req.embed_model, + created_at: chrono::Utc::now(), + created_by: req.created_by, + }; + match registry.put_profile(profile).await { + Ok(p) => Ok((StatusCode::CREATED, Json(p))), + Err(e) => Err((StatusCode::BAD_REQUEST, e)), + } +} + +async fn list_profiles(State(registry): State) -> impl IntoResponse { + Json(registry.list_profiles().await) +} + +async fn get_profile( + State(registry): State, + Path(id): Path, +) -> impl IntoResponse { + match registry.get_profile(&id).await { + Some(p) => Ok(Json(p)), + None => Err((StatusCode::NOT_FOUND, format!("profile not found: {id}"))), + } +} + +async fn delete_profile( + State(registry): State, + Path(id): Path, +) -> impl IntoResponse { + match registry.delete_profile(&id).await { + Ok(()) => Ok(StatusCode::NO_CONTENT), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index 6e3f0b4..9195ee3 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -196,6 +196,69 @@ pub struct AiView { pub description: String, } +/// Per-model configuration for HNSW builds. Declared inline so the PRD +/// invariant 9 ("every reader gets its own profile") can carry tuning +/// preferences without forcing a round-trip to vectord's trial system. +/// Mirrors `vectord::trial::HnswConfig` to avoid a cross-crate dep cycle. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ProfileHnswConfig { + pub ef_construction: usize, + pub ef_search: usize, + #[serde(default)] + pub seed: Option, +} + +impl Default for ProfileHnswConfig { + /// Matches the production HNSW default from Phase 15 trial sweep + /// (see docs/ADR and vectord::trial::HnswConfig::default). + fn default() -> Self { + Self { ef_construction: 80, ef_search: 30, seed: None } + } +} + +/// A named client of the substrate — a local LLM, an AI agent, or a +/// human role that should see a specific scoped view of the data. +/// Profiles are the first-class "every reader gets its own profile" +/// concept from PRD invariant 9. +/// +/// Bound datasets can be raw catalog tables OR `AiView` names — either +/// works at query time since both register as DataFusion tables with +/// the same name. Mixing raw and views lets you compose access rules: +/// bind a model to `candidates_safe` (PII-redacted view) and `placements` +/// (raw table, no PII), and the model gets exactly what it needs. +/// +/// Stored at `_catalog/profiles/{id}.json`. Activation (`/profile/{id}/ +/// activate`) pre-loads the embedding cache and builds HNSW with the +/// profile's config — first search after activate is warm. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ModelProfile { + /// Unique id — slug form (alphanumeric + `-`, `_`). Used in URLs, + /// bucket names (`profile:{id}`), and trial journal prefixes. + pub id: String, + /// Ollama model tag (e.g. "qwen2.5:7b", "nomic-embed-text:latest"). + /// Used when this profile is the "generate" or "embed" caller. + pub ollama_name: String, + /// Free-text description surfaced to operators. Who/what is this + /// profile for? + #[serde(default)] + pub description: String, + /// Names of datasets or AiViews this profile can read. A search + /// against an index whose `source` isn't in this list is refused. + pub bound_datasets: Vec, + /// Per-profile HNSW config, used when activate builds the graph. + #[serde(default)] + pub hnsw_config: ProfileHnswConfig, + /// Which embedding model this profile expects. Defaults to + /// `nomic-embed-text` (the only one currently installed). + #[serde(default = "default_embed_model")] + pub embed_model: String, + pub created_at: chrono::DateTime, + #[serde(default)] + pub created_by: String, +} + +fn default_embed_model() -> String { "nomic-embed-text".to_string() } + /// Soft-delete marker (Phase E). /// /// Tombstones live beside the dataset in `_catalog/tombstones/{dataset}/` diff --git a/crates/vectord/src/refresh.rs b/crates/vectord/src/refresh.rs index 970a395..fab6e9c 100644 --- a/crates/vectord/src/refresh.rs +++ b/crates/vectord/src/refresh.rs @@ -167,6 +167,11 @@ pub async fn refresh_index( if new_docs == 0 { tracing::info!("refresh '{}': no new docs to embed", dataset_name); + // Even on no-op, make sure index metadata is registered so + // downstream discovery (profile activation, A/B search) sees + // indexes built by earlier refresh calls that predated the + // auto-register behavior. + let _ = try_update_index_meta(index_registry, &req.index_name, existing.len()).await; registry.clear_embeddings_stale(dataset_name).await?; return Ok(RefreshResult { index_name: req.index_name.clone(), @@ -263,8 +268,10 @@ pub async fn refresh_index( }) } -/// Best-effort refresh of index registry metadata. If the index exists, -/// bump the chunk_count; if not, this is a no-op. +/// Best-effort refresh of index registry metadata. +/// - If the index already exists, bump the chunk_count. +/// - If it's brand new (first-time build via refresh), REGISTER it so +/// downstream discovery (profile activation, A/B search) can find it. async fn try_update_index_meta( index_registry: &IndexRegistry, index_name: &str, @@ -272,8 +279,32 @@ async fn try_update_index_meta( ) -> Result<(), String> { if let Some(mut meta) = index_registry.get(index_name).await { meta.chunk_count = chunk_count; - index_registry.register(meta).await - } else { - Ok(()) + return index_registry.register(meta).await; } + // First-time registration — infer reasonable defaults. Convention: + // index name `{source}_v1` or `{source}_vN` implies a source dataset + // named by stripping the `_vN` suffix. Otherwise use the index name + // as the source and let the caller patch later. + let source = match index_name.rsplit_once('_') { + Some((base, suffix)) if suffix.starts_with('v') && suffix[1..].chars().all(|c| c.is_ascii_digit()) => { + base.to_string() + } + _ => index_name.to_string(), + }; + let meta = crate::index_registry::IndexMeta { + index_name: index_name.to_string(), + source, + model_name: "nomic-embed-text".to_string(), + model_version: "latest".to_string(), + dimensions: 768, + chunk_count, + doc_count: chunk_count, + chunk_size: 500, + overlap: 50, + storage_key: format!("vectors/{index_name}.parquet"), + created_at: chrono::Utc::now(), + build_time_secs: 0.0, + chunks_per_sec: 0.0, + }; + index_registry.register(meta).await } diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 0c7f85c..0c1037d 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -54,6 +54,10 @@ pub fn router(state: VectorState) -> Router { // Phase C: embedding refresh .route("/refresh/{dataset_name}", post(refresh_dataset)) .route("/stale", get(list_stale)) + // Phase 17: profile activation — pre-load caches + HNSW for this + // model's bound data. First search after activate is warm. + .route("/profile/{id}/activate", post(activate_profile)) + .route("/profile/{id}/search", post(profile_scoped_search)) .with_state(state) } @@ -731,3 +735,187 @@ async fn list_stale(State(state): State) -> impl IntoResponse { .collect(); Json(entries) } + +// --- Phase 17: Model profile activation + scoped search --- + +#[derive(Serialize)] +struct ActivateReport { + profile_id: String, + ollama_name: String, + indexes_warmed: Vec, + failures: Vec, + total_vectors: usize, + duration_secs: f32, +} + +#[derive(Serialize)] +struct WarmedIndex { + index_name: String, + source: String, + vectors: usize, + hnsw_build_secs: f32, +} + +/// Warm this profile's indexes. For every bound dataset, find the +/// matching vector index (any index whose `source` equals the dataset +/// or view name), load its embeddings into EmbeddingCache, build HNSW +/// with the profile's config. Next `/profile/{id}/search` call is then +/// <1ms cold. +/// +/// Failures on individual indexes don't stop the activation — they get +/// reported in the response. This matches the "substrate keeps working" +/// philosophy from ADR-017: one bad binding shouldn't take down the +/// whole profile. +async fn activate_profile( + State(state): State, + Path(profile_id): Path, +) -> impl IntoResponse { + let t0 = std::time::Instant::now(); + + let profile = match state.catalog.get_profile(&profile_id).await { + Some(p) => p, + None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), + }; + + let mut warmed = Vec::new(); + let mut failures = Vec::new(); + let mut total_vectors = 0usize; + + let all_indexes = state.index_registry.list(None, None).await; + + for binding in &profile.bound_datasets { + // Find every index whose source matches this binding. + let matched: Vec<_> = all_indexes + .iter() + .filter(|m| &m.source == binding) + .collect(); + if matched.is_empty() { + failures.push(format!( + "no vector index found for binding '{}'", binding, + )); + continue; + } + for meta in matched { + // Pre-load embeddings into cache. + let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { + Ok(arc) => arc, + Err(e) => { + failures.push(format!("{}: load failed: {}", meta.index_name, e)); + continue; + } + }; + total_vectors += embeddings.len(); + + // Build HNSW with the profile's config. + let cfg = trial::HnswConfig { + ef_construction: profile.hnsw_config.ef_construction, + ef_search: profile.hnsw_config.ef_search, + seed: profile.hnsw_config.seed, + }; + let build_t = std::time::Instant::now(); + match state + .hnsw_store + .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) + .await + { + Ok(_) => { + warmed.push(WarmedIndex { + index_name: meta.index_name.clone(), + source: meta.source.clone(), + vectors: embeddings.len(), + hnsw_build_secs: build_t.elapsed().as_secs_f32(), + }); + } + Err(e) => { + failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); + } + } + } + } + + Ok(Json(ActivateReport { + profile_id: profile.id, + ollama_name: profile.ollama_name, + indexes_warmed: warmed, + failures, + total_vectors, + duration_secs: t0.elapsed().as_secs_f32(), + })) +} + +#[derive(Deserialize)] +struct ProfileSearchRequest { + index_name: String, + query: String, + top_k: Option, +} + +/// Search scoped to a profile — refuses if the requested index's source +/// isn't in the profile's bound_datasets. Reuses the existing HNSW +/// search path when the index is warm; falls back to brute-force cosine +/// if it's not (handled by the existing search code path). +async fn profile_scoped_search( + State(state): State, + Path(profile_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let profile = match state.catalog.get_profile(&profile_id).await { + Some(p) => p, + None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), + }; + + // Verify the index is in scope for this profile. + let index_meta = match state.index_registry.get(&req.index_name).await { + Some(m) => m, + None => return Err((StatusCode::NOT_FOUND, format!("index not found: {}", req.index_name))), + }; + if !profile.bound_datasets.contains(&index_meta.source) { + return Err(( + StatusCode::FORBIDDEN, + format!( + "profile '{}' is not bound to '{}' — allowed bindings: {:?}", + profile.id, index_meta.source, profile.bound_datasets, + ), + )); + } + + let top_k = req.top_k.unwrap_or(5); + + // Embed the query. + let embed_resp = state + .ai_client + .embed(EmbedRequest { texts: vec![req.query.clone()], model: None }) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?; + if embed_resp.embeddings.is_empty() { + return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into())); + } + let query_vec: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); + + // Prefer the hot HNSW path if the index is loaded, otherwise fall + // back to the brute-force path. + if state.hnsw_store.has_index(&req.index_name).await { + match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await { + Ok(hits) => Ok(Json(serde_json::json!({ + "profile": profile.id, + "source": index_meta.source, + "method": "hnsw", + "results": hits, + }))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } + } else { + let embeddings = state + .embedding_cache + .get_or_load(&req.index_name) + .await + .map_err(|e| (StatusCode::NOT_FOUND, format!("embeddings: {e}")))?; + let results = search::search(&query_vec, &embeddings, top_k); + Ok(Json(serde_json::json!({ + "profile": profile.id, + "source": index_meta.source, + "method": "brute_force", + "results": results, + }))) + } +} diff --git a/docs/PHASES.md b/docs/PHASES.md index a939914..1ad89fd 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -154,6 +154,15 @@ - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. +- [x] Phase 17: Model profiles + scoped search — 2026-04-16 + - `shared::types::ModelProfile` — { id, ollama_name, description, bound_datasets, hnsw_config, embed_model, created_at, created_by } + - `shared::types::ProfileHnswConfig` — mirror of vectord's HnswConfig to avoid cross-crate dep cycle (defaults ec=80 es=30 matching Phase 15 winner) + - `Registry::{put_profile, get_profile, list_profiles, delete_profile}` persisted at `_catalog/profiles/{id}.json`, validates bindings exist (raw dataset OR AiView) + - Endpoints: `POST/GET /catalog/profiles`, `GET/DELETE /catalog/profiles/{id}` + - `POST /vectors/profile/{id}/activate` — warms EmbeddingCache + builds HNSW with profile's config for every bound dataset's vector index; reports warmed indexes + failures + duration + - `POST /vectors/profile/{id}/search` — rejects 403 if requested index's source isn't in profile.bound_datasets; falls through to HNSW if warm, brute-force otherwise + - Fixed refresh to register new index metadata (was silently no-op for first-time indexes) + - End-to-end: security-analyst profile bound to threat_intel → activate warms 54 vectors in 156ms → within-scope HNSW search works (0.625 score); out-of-scope search for candidates returns 403 with allowed bindings listed - [x] Phase E: Soft deletes (tombstones) — 2026-04-16 - `shared::types::Tombstone` — { dataset, row_key_column, row_key_value, deleted_at, actor, reason } - `catalogd::tombstones::TombstoneStore` per-dataset append-log at `_catalog/tombstones/{dataset}/`, flush_threshold=1 + explicit flush so every tombstone is durable on return (compliance requirement)