diff --git a/crates/vectord/Cargo.toml b/crates/vectord/Cargo.toml index b43abd9..b4a7399 100644 --- a/crates/vectord/Cargo.toml +++ b/crates/vectord/Cargo.toml @@ -8,6 +8,7 @@ shared = { path = "../shared" } storaged = { path = "../storaged" } aibridge = { path = "../aibridge" } catalogd = { path = "../catalogd" } +queryd = { path = "../queryd" } # ADR-019 firewall — vectord-lance owns its own Arrow 57 / Lance 4 deps. # Public API uses only std types so no version conflict propagates here. vectord-lance = { path = "../vectord-lance" } diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 171f92b..2cec969 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -9,7 +9,7 @@ use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use aibridge::client::{AiClient, EmbedRequest}; +use aibridge::client::{AiClient, EmbedRequest, GenerateRequest}; use catalogd::registry::Registry as CatalogRegistry; use storaged::registry::BucketRegistry; use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, promotion, rag, refresh, search, store, supervisor, trial}; @@ -67,6 +67,7 @@ pub fn router(state: VectorState) -> Router { .route("/jobs/{id}", get(get_job)) .route("/search", post(search_index)) .route("/rag", post(rag_query)) + .route("/hybrid", post(hybrid_search)) .route("/hnsw/build", post(build_hnsw)) .route("/hnsw/search", post(search_hnsw)) .route("/hnsw/list", get(list_hnsw)) @@ -384,6 +385,233 @@ async fn rag_query( } } +// --- Hybrid SQL+Vector Search --- +// +// The fix for the core RAG gap: vector search alone can't do structured +// filtering (state, role, reliability threshold). SQL alone can't do +// semantic similarity ("who could handle this kind of work"). Hybrid +// does both: SQL narrows to structurally-valid candidates, vector +// ranks them by semantic relevance, LLM generates from verified context. + +#[derive(Deserialize)] +struct HybridRequest { + /// Natural language question — used for embedding + LLM generation. + question: String, + /// Vector index to search against. + index_name: String, + /// SQL WHERE clause to pre-filter. Applied against the index's source + /// dataset. Example: "state = 'IL' AND reliability > 0.8" + /// Safety: runs through DataFusion's parser so injection is bounded + /// by what DataFusion accepts (no DDL, no writes). + #[serde(default)] + sql_filter: Option, + /// Dataset to run the SQL filter against. Defaults to the index's + /// source if omitted. + #[serde(default)] + filter_dataset: Option, + /// Column in the SQL result that maps to the vector index's doc_id. + /// Default: "worker_id" (for the Ethereal dataset) or "candidate_id". + #[serde(default)] + id_column: Option, + #[serde(default = "default_top_k")] + top_k: usize, + /// If true, generate an LLM answer from the matched context. + /// If false, just return the ranked matches (faster, no Ollama gen). + #[serde(default = "default_true")] + generate: bool, +} + +fn default_true() -> bool { true } + +#[derive(serde::Serialize)] +struct HybridResponse { + question: String, + sql_filter: Option, + sql_matches: usize, + vector_reranked: usize, + method: String, + answer: Option, + sources: Vec, + duration_ms: u64, +} + +#[derive(serde::Serialize)] +struct HybridSource { + doc_id: String, + chunk_text: String, + score: f32, + sql_verified: bool, +} + +async fn hybrid_search( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + let t0 = std::time::Instant::now(); + + // Step 1: If SQL filter provided, run it to get the set of valid IDs. + let valid_ids: Option> = if let Some(ref filter) = req.sql_filter { + let index_meta = state.index_registry.get(&req.index_name).await; + let dataset = req.filter_dataset.clone() + .or_else(|| index_meta.map(|m| m.source.clone())) + .unwrap_or_else(|| req.index_name.clone()); + let id_col = req.id_column.clone().unwrap_or_else(|| "worker_id".into()); + + let sql = format!("SELECT CAST({id_col} AS VARCHAR) AS id FROM {dataset} WHERE {filter}"); + tracing::info!("hybrid: SQL filter → {sql}"); + + // Use queryd through the catalog — same engine as /query/sql + // Use the query engine to get JSON rows — avoids Arrow type + // wrangling across DataFusion's Utf8View/StringViewArray variants. + let engine = queryd::context::QueryEngine::new( + state.catalog.clone(), + state.bucket_registry.clone(), + queryd::cache::MemCache::new(0), + ); + match engine.query(&sql).await { + Ok(batches) => { + use arrow::array::{Array, AsArray}; + let mut ids = std::collections::HashSet::new(); + for batch in &batches { + if let Some(col) = batch.column_by_name("id") { + // DataFusion CAST(x AS VARCHAR) → StringViewArray. + // Try StringView first, then String, then Int. + if let Some(arr) = col.as_string_view_opt() { + for i in 0..arr.len() { + if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); } + } + } else if let Some(arr) = col.as_string_opt::() { + for i in 0..arr.len() { + if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); } + } + } else { + // Fallback: try as Int32/Int64 (if CAST didn't happen) + if let Some(arr) = col.as_any().downcast_ref::() { + for i in 0..arr.len() { + if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); } + } + } else if let Some(arr) = col.as_any().downcast_ref::() { + for i in 0..arr.len() { + if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); } + } + } + } + } + } + tracing::info!("hybrid: SQL filter returned {} IDs", ids.len()); + if ids.is_empty() { None } else { Some(ids) } + } + Err(e) => { + return Err((StatusCode::BAD_REQUEST, format!("SQL filter error: {e}"))); + } + } + } else { + None + }; + + // Step 2: Vector search — embed question, search index. + let embed_resp = state.ai_client + .embed(EmbedRequest { texts: vec![req.question.clone()], model: None }) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?; + if embed_resp.embeddings.is_empty() { + return Err((StatusCode::BAD_GATEWAY, "no embedding".into())); + } + let qv: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); + + // When SQL-filtered: use brute-force cosine over all embeddings, + // then filter by SQL IDs, then take top_k. HNSW's ef_search caps + // results at ~30, which is too few to reliably intersect with + // narrow SQL filters. Brute-force on 10K vectors is ~50ms — fast + // enough for the hybrid path. Without SQL filter, use HNSW normally. + let all_results = if valid_ids.is_some() { + // Brute-force path: score ALL vectors, filter by SQL IDs later. + let embeddings = store::load_embeddings(&state.store, &req.index_name).await + .map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?; + search::search(&qv, &embeddings, embeddings.len()) // score everything + } else if state.hnsw_store.has_index(&req.index_name).await { + state.hnsw_store.search(&req.index_name, &qv, req.top_k).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))? + .into_iter() + .map(|h| search::SearchResult { + doc_id: h.doc_id, + chunk_text: h.chunk_text, + score: h.score, + source: h.source, + chunk_idx: h.chunk_idx as u32, + }) + .collect::>() + } else { + let embeddings = store::load_embeddings(&state.store, &req.index_name).await + .map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?; + search::search(&qv, &embeddings, req.top_k) + }; + + // Step 3: Filter vector results to only SQL-verified IDs. + let sql_count = valid_ids.as_ref().map(|s| s.len()).unwrap_or(0); + let filtered: Vec = if let Some(ref ids) = valid_ids { + all_results.into_iter() + .filter(|r| { + // doc_id format is "W-{worker_id}" — extract the number + let num = r.doc_id.strip_prefix("W-") + .or_else(|| r.doc_id.strip_prefix("CAND-")) + .unwrap_or(&r.doc_id); + ids.contains(num) + }) + .take(req.top_k) + .collect() + } else { + all_results.into_iter().take(req.top_k).collect() + }; + + // Step 4: Build sources with SQL-verified flag. + let sources: Vec = filtered.iter().map(|r| HybridSource { + doc_id: r.doc_id.clone(), + chunk_text: r.chunk_text.clone(), + score: r.score, + sql_verified: valid_ids.is_some(), + }).collect(); + + // Step 5: Generate answer if requested. + let answer = if req.generate && !sources.is_empty() { + let context: String = sources.iter().enumerate().map(|(i, s)| { + format!("[{}] (id: {}, verified: {}) {}", i + 1, s.doc_id, s.sql_verified, s.chunk_text) + }).collect::>().join("\n\n"); + + let gen_resp = state.ai_client.generate(GenerateRequest { + prompt: format!( + "You are a staffing intelligence assistant. Answer based ONLY on these \ + verified worker records. Every record has been SQL-verified against the \ + database — you can trust the facts in them. Be specific: cite names, \ + skills, certifications, scores, and locations.\n\n\ + Records:\n{context}\n\n\ + Question: {}\n\nAnswer:", req.question, + ), + model: None, + system: None, + temperature: Some(0.2), + max_tokens: Some(512), + }).await; + + gen_resp.ok().map(|r| r.text.trim().to_string()) + } else { + None + }; + + let method = if valid_ids.is_some() { "hybrid_sql_vector" } else { "vector_only" }; + + Ok(Json(HybridResponse { + question: req.question, + sql_filter: req.sql_filter, + sql_matches: sql_count, + vector_reranked: sources.len(), + method: method.into(), + answer, + sources, + duration_ms: t0.elapsed().as_millis() as u64, + })) +} + // --- HNSW Fast Search --- #[derive(Deserialize)]