From 17a0259cd09e05537d6578b0062e3da921b16b18 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 16 Apr 2026 20:40:43 -0500 Subject: [PATCH] =?UTF-8?q?Profile-driven=20Lance=20routing=20=E2=80=94=20?= =?UTF-8?q?vector=5Fbackend=20auto-routes=20search=20+=20activate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit activate_profile: when profile.vector_backend == Lance, auto-migrates from Parquet if no Lance dataset exists, auto-builds IVF_PQ if no index attached. Reuses existing Lance dataset on subsequent activations. profile_scoped_search: routes to Lance IVF_PQ or Parquet+HNSW based on the profile's declared backend. Callers hit the same endpoint — the profile abstracts which storage tier serves the query. Verified: lance-recruiter (vector_backend=lance) and parquet-recruiter (vector_backend=parquet) both searched the same 100K index through POST /vectors/profile/{id}/search. Lance returned lance_ivf_pq at 25ms; Parquet returned hnsw at <1ms. Same API surface, different backends, transparent routing. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vectord/src/service.rs | 153 +++++++++++++++++++++++++--------- 1 file changed, 115 insertions(+), 38 deletions(-) diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 1c47acb..046c442 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -902,9 +902,9 @@ async fn activate_profile( } let all_indexes = state.index_registry.list(None, None).await; + let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance; for binding in &profile.bound_datasets { - // Find every index whose source matches this binding. let matched: Vec<_> = all_indexes .iter() .filter(|m| &m.source == binding) @@ -916,45 +916,106 @@ async fn activate_profile( continue; } for meta in matched { - // Pre-load embeddings into cache. - let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { - Ok(arc) => arc, - Err(e) => { - failures.push(format!("{}: load failed: {}", meta.index_name, e)); - continue; - } - }; - total_vectors += embeddings.len(); - - // Build HNSW with the index's PROMOTED config if one exists - // (Phase 16 hot-swap), otherwise fall back to the profile's - // declared defaults. This means autotune's winner is picked - // up on the next activation automatically. - let profile_default = trial::HnswConfig { - ef_construction: profile.hnsw_config.ef_construction, - ef_search: profile.hnsw_config.ef_search, - seed: profile.hnsw_config.seed, - }; - let cfg = state - .promotion_registry - .config_or(&meta.index_name, profile_default) - .await; - let build_t = std::time::Instant::now(); - match state - .hnsw_store - .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) - .await - { - Ok(_) => { + if use_lance { + // --- Lance activation path --- + // Ensure a Lance dataset exists for this index. If it + // doesn't, auto-migrate from the Parquet blob. Then + // ensure an IVF_PQ index is built. + let bucket = meta.bucket.clone(); + let lance_store = match state.lance.store_for_new(&meta.index_name, &bucket).await { + Ok(s) => s, + Err(e) => { + failures.push(format!("{}: lance store init: {e}", meta.index_name)); + continue; + } + }; + let count = lance_store.count().await.unwrap_or(0); + if count == 0 { + // Auto-migrate from existing Parquet. + let pq_store = match state.bucket_registry.get(&bucket) { + Ok(s) => s, + Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; } + }; + match storaged::ops::get(&pq_store, &meta.storage_key).await { + Ok(bytes) => { + let build_t = std::time::Instant::now(); + match lance_store.migrate_from_parquet_bytes(&bytes).await { + Ok(ms) => { + total_vectors += ms.rows_written; + tracing::info!( + "lance auto-migrate '{}': {} rows in {:.2}s", + meta.index_name, ms.rows_written, ms.duration_secs, + ); + warmed.push(WarmedIndex { + index_name: meta.index_name.clone(), + source: meta.source.clone(), + vectors: ms.rows_written, + hnsw_build_secs: build_t.elapsed().as_secs_f32(), + }); + } + Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)), + } + } + Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)), + } + } else { + total_vectors += count; warmed.push(WarmedIndex { index_name: meta.index_name.clone(), source: meta.source.clone(), - vectors: embeddings.len(), - hnsw_build_secs: build_t.elapsed().as_secs_f32(), + vectors: count, + hnsw_build_secs: 0.0, }); } - Err(e) => { - failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); + // Ensure IVF_PQ index exists. + if !lance_store.has_vector_index().await.unwrap_or(false) { + let build_t = std::time::Instant::now(); + match lance_store.build_index(316, 8, 48).await { + Ok(ix) => tracing::info!( + "lance auto-index '{}': built in {:.1}s", + meta.index_name, ix.build_time_secs, + ), + Err(e) => failures.push(format!("{}: lance index build: {e}", meta.index_name)), + } + let _ = build_t; // suppress unused warning + } + } else { + // --- Parquet + HNSW activation path (existing) --- + let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { + Ok(arc) => arc, + Err(e) => { + failures.push(format!("{}: load failed: {}", meta.index_name, e)); + continue; + } + }; + total_vectors += embeddings.len(); + + let profile_default = trial::HnswConfig { + ef_construction: profile.hnsw_config.ef_construction, + ef_search: profile.hnsw_config.ef_search, + seed: profile.hnsw_config.seed, + }; + let cfg = state + .promotion_registry + .config_or(&meta.index_name, profile_default) + .await; + let build_t = std::time::Instant::now(); + match state + .hnsw_store + .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) + .await + { + Ok(_) => { + warmed.push(WarmedIndex { + index_name: meta.index_name.clone(), + source: meta.source.clone(), + vectors: embeddings.len(), + hnsw_build_secs: build_t.elapsed().as_secs_f32(), + }); + } + Err(e) => { + failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); + } } } } @@ -1074,6 +1135,7 @@ async fn profile_scoped_search( } let top_k = req.top_k.unwrap_or(5); + let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance; // Embed the query. let embed_resp = state @@ -1086,9 +1148,24 @@ async fn profile_scoped_search( } let query_vec: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); - // Prefer the hot HNSW path if the index is loaded, otherwise fall - // back to the brute-force path. - if state.hnsw_store.has_index(&req.index_name).await { + // ADR-019 hybrid: route to Lance or Parquet+HNSW based on the + // profile's declared backend. Callers don't need to know which + // storage tier they're hitting — the profile abstracts it. + if use_lance { + let lance_store = state.lance.store_for(&req.index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + let t0 = std::time::Instant::now(); + match lance_store.search(&query_vec, top_k).await { + Ok(hits) => Ok(Json(serde_json::json!({ + "profile": profile.id, + "source": index_meta.source, + "method": "lance_ivf_pq", + "latency_us": t0.elapsed().as_micros() as u64, + "results": hits, + }))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } + } else if state.hnsw_store.has_index(&req.index_name).await { match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await { Ok(hits) => Ok(Json(serde_json::json!({ "profile": profile.id,