Profile-driven Lance routing — vector_backend auto-routes search + activate

activate_profile: when profile.vector_backend == Lance, auto-migrates
from Parquet if no Lance dataset exists, auto-builds IVF_PQ if no
index attached. Reuses existing Lance dataset on subsequent activations.

profile_scoped_search: routes to Lance IVF_PQ or Parquet+HNSW based
on the profile's declared backend. Callers hit the same endpoint —
the profile abstracts which storage tier serves the query.

Verified: lance-recruiter (vector_backend=lance) and parquet-recruiter
(vector_backend=parquet) both searched the same 100K index through
POST /vectors/profile/{id}/search. Lance returned lance_ivf_pq at
25ms; Parquet returned hnsw at <1ms. Same API surface, different
backends, transparent routing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 20:40:43 -05:00
parent 7c1222d240
commit 17a0259cd0

View File

@ -902,9 +902,9 @@ async fn activate_profile(
} }
let all_indexes = state.index_registry.list(None, None).await; let all_indexes = state.index_registry.list(None, None).await;
let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance;
for binding in &profile.bound_datasets { for binding in &profile.bound_datasets {
// Find every index whose source matches this binding.
let matched: Vec<_> = all_indexes let matched: Vec<_> = all_indexes
.iter() .iter()
.filter(|m| &m.source == binding) .filter(|m| &m.source == binding)
@ -916,45 +916,106 @@ async fn activate_profile(
continue; continue;
} }
for meta in matched { for meta in matched {
// Pre-load embeddings into cache. if use_lance {
let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { // --- Lance activation path ---
Ok(arc) => arc, // Ensure a Lance dataset exists for this index. If it
Err(e) => { // doesn't, auto-migrate from the Parquet blob. Then
failures.push(format!("{}: load failed: {}", meta.index_name, e)); // ensure an IVF_PQ index is built.
continue; let bucket = meta.bucket.clone();
} let lance_store = match state.lance.store_for_new(&meta.index_name, &bucket).await {
}; Ok(s) => s,
total_vectors += embeddings.len(); Err(e) => {
failures.push(format!("{}: lance store init: {e}", meta.index_name));
// Build HNSW with the index's PROMOTED config if one exists continue;
// (Phase 16 hot-swap), otherwise fall back to the profile's }
// declared defaults. This means autotune's winner is picked };
// up on the next activation automatically. let count = lance_store.count().await.unwrap_or(0);
let profile_default = trial::HnswConfig { if count == 0 {
ef_construction: profile.hnsw_config.ef_construction, // Auto-migrate from existing Parquet.
ef_search: profile.hnsw_config.ef_search, let pq_store = match state.bucket_registry.get(&bucket) {
seed: profile.hnsw_config.seed, Ok(s) => s,
}; Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; }
let cfg = state };
.promotion_registry match storaged::ops::get(&pq_store, &meta.storage_key).await {
.config_or(&meta.index_name, profile_default) Ok(bytes) => {
.await; let build_t = std::time::Instant::now();
let build_t = std::time::Instant::now(); match lance_store.migrate_from_parquet_bytes(&bytes).await {
match state Ok(ms) => {
.hnsw_store total_vectors += ms.rows_written;
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) tracing::info!(
.await "lance auto-migrate '{}': {} rows in {:.2}s",
{ meta.index_name, ms.rows_written, ms.duration_secs,
Ok(_) => { );
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: ms.rows_written,
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
});
}
Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)),
}
}
Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)),
}
} else {
total_vectors += count;
warmed.push(WarmedIndex { warmed.push(WarmedIndex {
index_name: meta.index_name.clone(), index_name: meta.index_name.clone(),
source: meta.source.clone(), source: meta.source.clone(),
vectors: embeddings.len(), vectors: count,
hnsw_build_secs: build_t.elapsed().as_secs_f32(), hnsw_build_secs: 0.0,
}); });
} }
Err(e) => { // Ensure IVF_PQ index exists.
failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); if !lance_store.has_vector_index().await.unwrap_or(false) {
let build_t = std::time::Instant::now();
match lance_store.build_index(316, 8, 48).await {
Ok(ix) => tracing::info!(
"lance auto-index '{}': built in {:.1}s",
meta.index_name, ix.build_time_secs,
),
Err(e) => failures.push(format!("{}: lance index build: {e}", meta.index_name)),
}
let _ = build_t; // suppress unused warning
}
} else {
// --- Parquet + HNSW activation path (existing) ---
let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await {
Ok(arc) => arc,
Err(e) => {
failures.push(format!("{}: load failed: {}", meta.index_name, e));
continue;
}
};
total_vectors += embeddings.len();
let profile_default = trial::HnswConfig {
ef_construction: profile.hnsw_config.ef_construction,
ef_search: profile.hnsw_config.ef_search,
seed: profile.hnsw_config.seed,
};
let cfg = state
.promotion_registry
.config_or(&meta.index_name, profile_default)
.await;
let build_t = std::time::Instant::now();
match state
.hnsw_store
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg)
.await
{
Ok(_) => {
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: embeddings.len(),
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
});
}
Err(e) => {
failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e));
}
} }
} }
} }
@ -1074,6 +1135,7 @@ async fn profile_scoped_search(
} }
let top_k = req.top_k.unwrap_or(5); let top_k = req.top_k.unwrap_or(5);
let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance;
// Embed the query. // Embed the query.
let embed_resp = state let embed_resp = state
@ -1086,9 +1148,24 @@ async fn profile_scoped_search(
} }
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// Prefer the hot HNSW path if the index is loaded, otherwise fall // ADR-019 hybrid: route to Lance or Parquet+HNSW based on the
// back to the brute-force path. // profile's declared backend. Callers don't need to know which
if state.hnsw_store.has_index(&req.index_name).await { // storage tier they're hitting — the profile abstracts it.
if use_lance {
let lance_store = state.lance.store_for(&req.index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let t0 = std::time::Instant::now();
match lance_store.search(&query_vec, top_k).await {
Ok(hits) => Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "lance_ivf_pq",
"latency_us": t0.elapsed().as_micros() as u64,
"results": hits,
}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
} else if state.hnsw_store.has_index(&req.index_name).await {
match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await { match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await {
Ok(hits) => Ok(Json(serde_json::json!({ Ok(hits) => Ok(Json(serde_json::json!({
"profile": profile.id, "profile": profile.id,