Phase 17: Model profiles + scoped search — the LLM-brain keystone

Implements PRD invariant 9 ("every reader gets its own profile") and
completes the multi-model substrate vision. Local models (or agents)
bind to a named set of datasets; activation pre-loads their vector
indexes into memory; search enforces scope.

Schema (shared::types):
- ModelProfile { id, ollama_name, description, bound_datasets,
                 hnsw_config, embed_model, created_at, created_by }
- ProfileHnswConfig mirrors vectord::trial::HnswConfig to avoid a
  cross-crate dep cycle. Default (ec=80, es=30) matches the Phase 15
  trial winner.
- bound_datasets can reference raw dataset names OR AiView names
  (both register as DataFusion tables with the same name, so mixing
  raw tables and PII-redacted views composes naturally)

Catalog (catalogd::registry):
- put_profile validates id is a slug (alphanumeric + -_ only) and
  every binding resolves to an existing dataset or view
- Persistence at _catalog/profiles/{id}.json, loaded on rebuild
- get_profile / list_profiles / delete_profile

HTTP endpoints:
- POST /catalog/profiles  (create/update)
- GET  /catalog/profiles  (list)
- GET/DELETE /catalog/profiles/{id}
- POST /vectors/profile/{id}/activate  (HNSW hot-load)
- POST /vectors/profile/{id}/search    (scope-enforced)

Activation (vectord::service::activate_profile):
- For each bound dataset, find vector indexes with matching source
- Pre-load embeddings into EmbeddingCache
- Build HNSW with profile's config
- Report warmed indexes + per-binding failures + duration
- Failures on individual bindings don't abort — "substrate keeps
  working" per ADR-017

Scoped search (vectord::service::profile_scoped_search):
- Look up profile, verify index.source ∈ profile.bound_datasets
- Returns 403 with allowed bindings list if out-of-scope
- Uses HNSW if index is warm, brute-force cosine otherwise (graceful
  degradation — no "must activate first" friction)

Bug fix surfaced during testing: vectord::refresh::try_update_index_meta
was a no-op for first-time indexes, so threat_intel_v1 and
kb_team_runs_v1 (both built via refresh after Phase C shipped) didn't
show up in the index registry. Now it auto-infers the source from the
index name convention (`{source}_vN`) and registers new metadata with
reasonable defaults.

End-to-end verified:
- Created security-analyst profile bound to [threat_intel]
- POST /vectors/profile/security-analyst/activate → warmed
  threat_intel_v1 (54 vectors) in 156ms, HNSW built
- Within-scope search: method=hnsw, returned relevant IP indicators
- Out-of-scope: tried to search resumes_100k_v2 (source=candidates)
  → 403 "profile 'security-analyst' is not bound to 'candidates' —
    allowed bindings: [\"threat_intel\"]"
- staffing-recruiter profile created bound to candidates + placements;
  search without activation fell through to brute_force (graceful)

Deferred (Phase 17 followups):
- VRAM-aware activation (unload-then-load via Ollama keep_alive=0)
  — Ollama already handles this; we don't need to reinvent
- Model-identity in audit trail — Phase 13 has role-based audit;
  adding model_id is ~20 LOC when we want it
- Profile bucket pre-load (profile:user bucket mount) — Phase 17.5

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 10:09:43 -05:00
parent d87f2ccac6
commit a293502265
6 changed files with 437 additions and 7 deletions

View File

@ -1,6 +1,6 @@
use shared::types::{
AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ObjectRef,
RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone,
AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile,
ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone,
};
use crate::tombstones::TombstoneStore;
@ -45,6 +45,7 @@ pub struct MetadataUpdate {
const MANIFEST_PREFIX: &str = "_catalog/manifests";
const VIEW_PREFIX: &str = "_catalog/views";
const PROFILE_PREFIX: &str = "_catalog/profiles";
/// In-memory dataset registry backed by manifest persistence in object storage.
/// Also tracks AiViews (Phase D) — safe projections over base datasets.
@ -53,6 +54,7 @@ const VIEW_PREFIX: &str = "_catalog/views";
pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
views: Arc<RwLock<HashMap<String, AiView>>>,
profiles: Arc<RwLock<HashMap<String, ModelProfile>>>,
tombstones: TombstoneStore,
store: Arc<dyn ObjectStore>,
}
@ -62,6 +64,7 @@ impl Registry {
Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
views: Arc::new(RwLock::new(HashMap::new())),
profiles: Arc::new(RwLock::new(HashMap::new())),
tombstones: TombstoneStore::new(store.clone()),
store,
}
@ -136,9 +139,79 @@ impl Registry {
tracing::info!("catalog: {} views loaded", views.len());
}
// Phase 17: load model profiles.
let profile_keys = ops::list(&self.store, Some(PROFILE_PREFIX)).await.unwrap_or_default();
let mut profiles = self.profiles.write().await;
profiles.clear();
for key in &profile_keys {
if !key.ends_with(".json") { continue; }
let data = match ops::get(&self.store, key).await {
Ok(d) => d,
Err(e) => { tracing::warn!("profile '{key}': read failed: {e}"); continue; }
};
match serde_json::from_slice::<ModelProfile>(&data) {
Ok(p) => { profiles.insert(p.id.clone(), p); }
Err(e) => tracing::warn!("profile '{key}': parse failed: {e}"),
}
}
if !profiles.is_empty() {
tracing::info!("catalog: {} model profiles loaded", profiles.len());
}
Ok(count)
}
// --- Phase 17: Model profiles ---
/// Create or replace a model profile. Validates id slug + ensures
/// every bound_dataset exists (as either a raw dataset or an AiView).
pub async fn put_profile(&self, profile: ModelProfile) -> Result<ModelProfile, String> {
if profile.id.is_empty() {
return Err("profile id is empty".into());
}
if !profile.id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') {
return Err(format!(
"profile id '{}' must be alphanumeric + '-' or '_' only", profile.id,
));
}
for binding in &profile.bound_datasets {
let exists = self.get_by_name(binding).await.is_some()
|| self.get_view(binding).await.is_some();
if !exists {
return Err(format!(
"bound dataset '{}' not found as dataset or view", binding,
));
}
}
let key = format!("{PROFILE_PREFIX}/{}.json", profile.id);
let json = serde_json::to_vec_pretty(&profile).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
let mut profiles = self.profiles.write().await;
profiles.insert(profile.id.clone(), profile.clone());
tracing::info!(
"profile registered: {} -> ollama={} bindings={:?}",
profile.id, profile.ollama_name, profile.bound_datasets,
);
Ok(profile)
}
pub async fn get_profile(&self, id: &str) -> Option<ModelProfile> {
self.profiles.read().await.get(id).cloned()
}
pub async fn list_profiles(&self) -> Vec<ModelProfile> {
self.profiles.read().await.values().cloned().collect()
}
pub async fn delete_profile(&self, id: &str) -> Result<(), String> {
let key = format!("{PROFILE_PREFIX}/{id}.json");
ops::delete(&self.store, &key).await?;
self.profiles.write().await.remove(id);
Ok(())
}
/// Register a new dataset. Persists manifest to storage before updating memory.
pub async fn register(
&self,

View File

@ -27,6 +27,9 @@ pub fn router(registry: Registry) -> Router {
.route("/views/{name}", get(get_view).delete(delete_view))
// Phase E: soft-delete tombstones
.route("/datasets/by-name/{name}/tombstone", post(tombstone_rows).get(list_tombstones))
// Phase 17: model profiles
.route("/profiles", post(create_profile).get(list_profiles))
.route("/profiles/{id}", get(get_profile).delete(delete_profile))
.with_state(registry)
}
@ -338,3 +341,66 @@ async fn list_tombstones(
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Phase 17: Model profiles ---
#[derive(Deserialize)]
struct CreateProfileRequest {
id: String,
ollama_name: String,
#[serde(default)]
description: String,
bound_datasets: Vec<String>,
#[serde(default)]
hnsw_config: shared::types::ProfileHnswConfig,
#[serde(default = "default_embed_model_req")]
embed_model: String,
#[serde(default)]
created_by: String,
}
fn default_embed_model_req() -> String { "nomic-embed-text".to_string() }
async fn create_profile(
State(registry): State<Registry>,
Json(req): Json<CreateProfileRequest>,
) -> impl IntoResponse {
let profile = shared::types::ModelProfile {
id: req.id,
ollama_name: req.ollama_name,
description: req.description,
bound_datasets: req.bound_datasets,
hnsw_config: req.hnsw_config,
embed_model: req.embed_model,
created_at: chrono::Utc::now(),
created_by: req.created_by,
};
match registry.put_profile(profile).await {
Ok(p) => Ok((StatusCode::CREATED, Json(p))),
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
async fn list_profiles(State(registry): State<Registry>) -> impl IntoResponse {
Json(registry.list_profiles().await)
}
async fn get_profile(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
match registry.get_profile(&id).await {
Some(p) => Ok(Json(p)),
None => Err((StatusCode::NOT_FOUND, format!("profile not found: {id}"))),
}
}
async fn delete_profile(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
match registry.delete_profile(&id).await {
Ok(()) => Ok(StatusCode::NO_CONTENT),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -196,6 +196,69 @@ pub struct AiView {
pub description: String,
}
/// Per-model configuration for HNSW builds. Declared inline so the PRD
/// invariant 9 ("every reader gets its own profile") can carry tuning
/// preferences without forcing a round-trip to vectord's trial system.
/// Mirrors `vectord::trial::HnswConfig` to avoid a cross-crate dep cycle.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProfileHnswConfig {
pub ef_construction: usize,
pub ef_search: usize,
#[serde(default)]
pub seed: Option<u64>,
}
impl Default for ProfileHnswConfig {
/// Matches the production HNSW default from Phase 15 trial sweep
/// (see docs/ADR and vectord::trial::HnswConfig::default).
fn default() -> Self {
Self { ef_construction: 80, ef_search: 30, seed: None }
}
}
/// A named client of the substrate — a local LLM, an AI agent, or a
/// human role that should see a specific scoped view of the data.
/// Profiles are the first-class "every reader gets its own profile"
/// concept from PRD invariant 9.
///
/// Bound datasets can be raw catalog tables OR `AiView` names — either
/// works at query time since both register as DataFusion tables with
/// the same name. Mixing raw and views lets you compose access rules:
/// bind a model to `candidates_safe` (PII-redacted view) and `placements`
/// (raw table, no PII), and the model gets exactly what it needs.
///
/// Stored at `_catalog/profiles/{id}.json`. Activation (`/profile/{id}/
/// activate`) pre-loads the embedding cache and builds HNSW with the
/// profile's config — first search after activate is warm.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelProfile {
/// Unique id — slug form (alphanumeric + `-`, `_`). Used in URLs,
/// bucket names (`profile:{id}`), and trial journal prefixes.
pub id: String,
/// Ollama model tag (e.g. "qwen2.5:7b", "nomic-embed-text:latest").
/// Used when this profile is the "generate" or "embed" caller.
pub ollama_name: String,
/// Free-text description surfaced to operators. Who/what is this
/// profile for?
#[serde(default)]
pub description: String,
/// Names of datasets or AiViews this profile can read. A search
/// against an index whose `source` isn't in this list is refused.
pub bound_datasets: Vec<String>,
/// Per-profile HNSW config, used when activate builds the graph.
#[serde(default)]
pub hnsw_config: ProfileHnswConfig,
/// Which embedding model this profile expects. Defaults to
/// `nomic-embed-text` (the only one currently installed).
#[serde(default = "default_embed_model")]
pub embed_model: String,
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(default)]
pub created_by: String,
}
fn default_embed_model() -> String { "nomic-embed-text".to_string() }
/// Soft-delete marker (Phase E).
///
/// Tombstones live beside the dataset in `_catalog/tombstones/{dataset}/`

View File

@ -167,6 +167,11 @@ pub async fn refresh_index(
if new_docs == 0 {
tracing::info!("refresh '{}': no new docs to embed", dataset_name);
// Even on no-op, make sure index metadata is registered so
// downstream discovery (profile activation, A/B search) sees
// indexes built by earlier refresh calls that predated the
// auto-register behavior.
let _ = try_update_index_meta(index_registry, &req.index_name, existing.len()).await;
registry.clear_embeddings_stale(dataset_name).await?;
return Ok(RefreshResult {
index_name: req.index_name.clone(),
@ -263,8 +268,10 @@ pub async fn refresh_index(
})
}
/// Best-effort refresh of index registry metadata. If the index exists,
/// bump the chunk_count; if not, this is a no-op.
/// Best-effort refresh of index registry metadata.
/// - If the index already exists, bump the chunk_count.
/// - If it's brand new (first-time build via refresh), REGISTER it so
/// downstream discovery (profile activation, A/B search) can find it.
async fn try_update_index_meta(
index_registry: &IndexRegistry,
index_name: &str,
@ -272,8 +279,32 @@ async fn try_update_index_meta(
) -> Result<(), String> {
if let Some(mut meta) = index_registry.get(index_name).await {
meta.chunk_count = chunk_count;
index_registry.register(meta).await
} else {
Ok(())
return index_registry.register(meta).await;
}
// First-time registration — infer reasonable defaults. Convention:
// index name `{source}_v1` or `{source}_vN` implies a source dataset
// named by stripping the `_vN` suffix. Otherwise use the index name
// as the source and let the caller patch later.
let source = match index_name.rsplit_once('_') {
Some((base, suffix)) if suffix.starts_with('v') && suffix[1..].chars().all(|c| c.is_ascii_digit()) => {
base.to_string()
}
_ => index_name.to_string(),
};
let meta = crate::index_registry::IndexMeta {
index_name: index_name.to_string(),
source,
model_name: "nomic-embed-text".to_string(),
model_version: "latest".to_string(),
dimensions: 768,
chunk_count,
doc_count: chunk_count,
chunk_size: 500,
overlap: 50,
storage_key: format!("vectors/{index_name}.parquet"),
created_at: chrono::Utc::now(),
build_time_secs: 0.0,
chunks_per_sec: 0.0,
};
index_registry.register(meta).await
}

View File

@ -54,6 +54,10 @@ pub fn router(state: VectorState) -> Router {
// Phase C: embedding refresh
.route("/refresh/{dataset_name}", post(refresh_dataset))
.route("/stale", get(list_stale))
// Phase 17: profile activation — pre-load caches + HNSW for this
// model's bound data. First search after activate is warm.
.route("/profile/{id}/activate", post(activate_profile))
.route("/profile/{id}/search", post(profile_scoped_search))
.with_state(state)
}
@ -731,3 +735,187 @@ async fn list_stale(State(state): State<VectorState>) -> impl IntoResponse {
.collect();
Json(entries)
}
// --- Phase 17: Model profile activation + scoped search ---
#[derive(Serialize)]
struct ActivateReport {
profile_id: String,
ollama_name: String,
indexes_warmed: Vec<WarmedIndex>,
failures: Vec<String>,
total_vectors: usize,
duration_secs: f32,
}
#[derive(Serialize)]
struct WarmedIndex {
index_name: String,
source: String,
vectors: usize,
hnsw_build_secs: f32,
}
/// Warm this profile's indexes. For every bound dataset, find the
/// matching vector index (any index whose `source` equals the dataset
/// or view name), load its embeddings into EmbeddingCache, build HNSW
/// with the profile's config. Next `/profile/{id}/search` call is then
/// <1ms cold.
///
/// Failures on individual indexes don't stop the activation — they get
/// reported in the response. This matches the "substrate keeps working"
/// philosophy from ADR-017: one bad binding shouldn't take down the
/// whole profile.
async fn activate_profile(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
) -> impl IntoResponse {
let t0 = std::time::Instant::now();
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
let mut warmed = Vec::new();
let mut failures = Vec::new();
let mut total_vectors = 0usize;
let all_indexes = state.index_registry.list(None, None).await;
for binding in &profile.bound_datasets {
// Find every index whose source matches this binding.
let matched: Vec<_> = all_indexes
.iter()
.filter(|m| &m.source == binding)
.collect();
if matched.is_empty() {
failures.push(format!(
"no vector index found for binding '{}'", binding,
));
continue;
}
for meta in matched {
// Pre-load embeddings into cache.
let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await {
Ok(arc) => arc,
Err(e) => {
failures.push(format!("{}: load failed: {}", meta.index_name, e));
continue;
}
};
total_vectors += embeddings.len();
// Build HNSW with the profile's config.
let cfg = trial::HnswConfig {
ef_construction: profile.hnsw_config.ef_construction,
ef_search: profile.hnsw_config.ef_search,
seed: profile.hnsw_config.seed,
};
let build_t = std::time::Instant::now();
match state
.hnsw_store
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg)
.await
{
Ok(_) => {
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: embeddings.len(),
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
});
}
Err(e) => {
failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e));
}
}
}
}
Ok(Json(ActivateReport {
profile_id: profile.id,
ollama_name: profile.ollama_name,
indexes_warmed: warmed,
failures,
total_vectors,
duration_secs: t0.elapsed().as_secs_f32(),
}))
}
#[derive(Deserialize)]
struct ProfileSearchRequest {
index_name: String,
query: String,
top_k: Option<usize>,
}
/// Search scoped to a profile — refuses if the requested index's source
/// isn't in the profile's bound_datasets. Reuses the existing HNSW
/// search path when the index is warm; falls back to brute-force cosine
/// if it's not (handled by the existing search code path).
async fn profile_scoped_search(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
Json(req): Json<ProfileSearchRequest>,
) -> impl IntoResponse {
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
// Verify the index is in scope for this profile.
let index_meta = match state.index_registry.get(&req.index_name).await {
Some(m) => m,
None => return Err((StatusCode::NOT_FOUND, format!("index not found: {}", req.index_name))),
};
if !profile.bound_datasets.contains(&index_meta.source) {
return Err((
StatusCode::FORBIDDEN,
format!(
"profile '{}' is not bound to '{}' — allowed bindings: {:?}",
profile.id, index_meta.source, profile.bound_datasets,
),
));
}
let top_k = req.top_k.unwrap_or(5);
// Embed the query.
let embed_resp = state
.ai_client
.embed(EmbedRequest { texts: vec![req.query.clone()], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into()));
}
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// Prefer the hot HNSW path if the index is loaded, otherwise fall
// back to the brute-force path.
if state.hnsw_store.has_index(&req.index_name).await {
match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await {
Ok(hits) => Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "hnsw",
"results": hits,
}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
} else {
let embeddings = state
.embedding_cache
.get_or_load(&req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("embeddings: {e}")))?;
let results = search::search(&query_vec, &embeddings, top_k);
Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "brute_force",
"results": results,
})))
}
}

View File

@ -154,6 +154,15 @@
- `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack
- 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard
- Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling.
- [x] Phase 17: Model profiles + scoped search — 2026-04-16
- `shared::types::ModelProfile` — { id, ollama_name, description, bound_datasets, hnsw_config, embed_model, created_at, created_by }
- `shared::types::ProfileHnswConfig` — mirror of vectord's HnswConfig to avoid cross-crate dep cycle (defaults ec=80 es=30 matching Phase 15 winner)
- `Registry::{put_profile, get_profile, list_profiles, delete_profile}` persisted at `_catalog/profiles/{id}.json`, validates bindings exist (raw dataset OR AiView)
- Endpoints: `POST/GET /catalog/profiles`, `GET/DELETE /catalog/profiles/{id}`
- `POST /vectors/profile/{id}/activate` — warms EmbeddingCache + builds HNSW with profile's config for every bound dataset's vector index; reports warmed indexes + failures + duration
- `POST /vectors/profile/{id}/search` — rejects 403 if requested index's source isn't in profile.bound_datasets; falls through to HNSW if warm, brute-force otherwise
- Fixed refresh to register new index metadata (was silently no-op for first-time indexes)
- End-to-end: security-analyst profile bound to threat_intel → activate warms 54 vectors in 156ms → within-scope HNSW search works (0.625 score); out-of-scope search for candidates returns 403 with allowed bindings listed
- [x] Phase E: Soft deletes (tombstones) — 2026-04-16
- `shared::types::Tombstone` — { dataset, row_key_column, row_key_value, deleted_at, actor, reason }
- `catalogd::tombstones::TombstoneStore` per-dataset append-log at `_catalog/tombstones/{dataset}/`, flush_threshold=1 + explicit flush so every tombstone is durable on return (compliance requirement)