Phase 7: Vector index + RAG pipeline
- vectord crate: chunk → embed → store → search → RAG - chunker: configurable chunk size + overlap, sentence-boundary aware splitting - store: embeddings as Parquet (binary blob f32 vectors), portable format - search: brute-force cosine similarity (works up to ~100K vectors) - rag: full pipeline — embed question → search index → retrieve context → LLM answer - Endpoints: POST /vectors/index, /vectors/search, /vectors/rag - Gateway wired with vectord service - Tested: 200 candidate resumes indexed in 5.4s, semantic search + RAG working - 20 unit tests passing (chunker, search, ingestd, shared) - AI gives honest "no match found" when context doesn't support an answer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bb05c4412e
commit
26fc98c885
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -2392,6 +2392,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"vectord",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5407,6 +5408,24 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||
|
||||
[[package]]
|
||||
name = "vectord"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"aibridge",
|
||||
"arrow",
|
||||
"axum",
|
||||
"bytes",
|
||||
"object_store",
|
||||
"parquet",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"shared",
|
||||
"storaged",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.5"
|
||||
|
||||
@ -8,6 +8,7 @@ members = [
|
||||
"crates/queryd",
|
||||
"crates/aibridge",
|
||||
"crates/ingestd",
|
||||
"crates/vectord",
|
||||
"crates/gateway",
|
||||
"crates/ui",
|
||||
]
|
||||
|
||||
@ -10,6 +10,7 @@ catalogd = { path = "../catalogd" }
|
||||
queryd = { path = "../queryd" }
|
||||
aibridge = { path = "../aibridge" }
|
||||
ingestd = { path = "../ingestd" }
|
||||
vectord = { path = "../vectord" }
|
||||
tokio = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@ -42,10 +42,14 @@ async fn main() {
|
||||
.nest("/storage", storaged::service::router(store.clone()))
|
||||
.nest("/catalog", catalogd::service::router(registry.clone()))
|
||||
.nest("/query", queryd::service::router(engine))
|
||||
.nest("/ai", aibridge::service::router(ai_client))
|
||||
.nest("/ai", aibridge::service::router(ai_client.clone()))
|
||||
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
|
||||
store: store.clone(),
|
||||
registry: registry.clone(),
|
||||
}))
|
||||
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
|
||||
store: store.clone(),
|
||||
ai_client: ai_client.clone(),
|
||||
}));
|
||||
|
||||
// Auth middleware (if enabled)
|
||||
|
||||
18
crates/vectord/Cargo.toml
Normal file
18
crates/vectord/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "vectord"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
shared = { path = "../shared" }
|
||||
storaged = { path = "../storaged" }
|
||||
aibridge = { path = "../aibridge" }
|
||||
tokio = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
object_store = { workspace = true }
|
||||
parquet = { workspace = true }
|
||||
arrow = { workspace = true }
|
||||
135
crates/vectord/src/chunker.rs
Normal file
135
crates/vectord/src/chunker.rs
Normal file
@ -0,0 +1,135 @@
|
||||
/// Text chunking strategies for embedding.
|
||||
/// Chunks need to be small enough for the embedding model (typically <512 tokens)
|
||||
/// but large enough to carry meaning.
|
||||
|
||||
/// A chunk of text with metadata pointing back to its source.
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct TextChunk {
|
||||
/// Source identifier (dataset name, filename, etc.)
|
||||
pub source: String,
|
||||
/// Row or document ID within the source
|
||||
pub doc_id: String,
|
||||
/// Chunk index within the document (0, 1, 2, ...)
|
||||
pub chunk_idx: u32,
|
||||
/// The actual text content
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
/// Split text into overlapping chunks.
|
||||
/// - `chunk_size`: target characters per chunk (not tokens — chars are a good proxy)
|
||||
/// - `overlap`: characters of overlap between consecutive chunks
|
||||
pub fn chunk_text(
|
||||
text: &str,
|
||||
source: &str,
|
||||
doc_id: &str,
|
||||
chunk_size: usize,
|
||||
overlap: usize,
|
||||
) -> Vec<TextChunk> {
|
||||
let text = text.trim();
|
||||
if text.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
// Short text — single chunk
|
||||
if text.len() <= chunk_size {
|
||||
return vec![TextChunk {
|
||||
source: source.to_string(),
|
||||
doc_id: doc_id.to_string(),
|
||||
chunk_idx: 0,
|
||||
text: text.to_string(),
|
||||
}];
|
||||
}
|
||||
|
||||
let mut chunks = Vec::new();
|
||||
let mut start = 0;
|
||||
let mut idx = 0u32;
|
||||
|
||||
while start < text.len() {
|
||||
let end = (start + chunk_size).min(text.len());
|
||||
|
||||
// Try to break at a sentence or paragraph boundary
|
||||
let chunk_text = &text[start..end];
|
||||
let actual_end = if end < text.len() {
|
||||
// Look for last sentence boundary in the chunk
|
||||
if let Some(pos) = chunk_text.rfind(". ") {
|
||||
start + pos + 2
|
||||
} else if let Some(pos) = chunk_text.rfind('\n') {
|
||||
start + pos + 1
|
||||
} else {
|
||||
// Fall back to word boundary
|
||||
if let Some(pos) = chunk_text.rfind(' ') {
|
||||
start + pos + 1
|
||||
} else {
|
||||
end
|
||||
}
|
||||
}
|
||||
} else {
|
||||
end
|
||||
};
|
||||
|
||||
let chunk = text[start..actual_end].trim();
|
||||
if !chunk.is_empty() {
|
||||
chunks.push(TextChunk {
|
||||
source: source.to_string(),
|
||||
doc_id: doc_id.to_string(),
|
||||
chunk_idx: idx,
|
||||
text: chunk.to_string(),
|
||||
});
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
// Advance with overlap
|
||||
if actual_end >= text.len() {
|
||||
break;
|
||||
}
|
||||
start = if actual_end > overlap { actual_end - overlap } else { actual_end };
|
||||
}
|
||||
|
||||
chunks
|
||||
}
|
||||
|
||||
/// Chunk a dataset's text column. Returns all chunks from all rows.
|
||||
pub fn chunk_column(
|
||||
source: &str,
|
||||
doc_ids: &[String],
|
||||
texts: &[String],
|
||||
chunk_size: usize,
|
||||
overlap: usize,
|
||||
) -> Vec<TextChunk> {
|
||||
let mut all_chunks = Vec::new();
|
||||
for (doc_id, text) in doc_ids.iter().zip(texts.iter()) {
|
||||
let chunks = chunk_text(text, source, doc_id, chunk_size, overlap);
|
||||
all_chunks.extend(chunks);
|
||||
}
|
||||
all_chunks
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn short_text_single_chunk() {
|
||||
let chunks = chunk_text("Hello world", "test", "1", 500, 50);
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].text, "Hello world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn long_text_multiple_chunks() {
|
||||
let text = "First sentence. Second sentence. Third sentence. Fourth sentence. Fifth sentence. Sixth sentence. Seventh sentence. Eighth sentence.";
|
||||
let chunks = chunk_text(text, "test", "1", 50, 10);
|
||||
assert!(chunks.len() > 1);
|
||||
// All text should be covered
|
||||
for chunk in &chunks {
|
||||
assert!(!chunk.text.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chunk_preserves_source() {
|
||||
let chunks = chunk_text("Some text here", "candidates", "CAND-001", 500, 50);
|
||||
assert_eq!(chunks[0].source, "candidates");
|
||||
assert_eq!(chunks[0].doc_id, "CAND-001");
|
||||
}
|
||||
}
|
||||
5
crates/vectord/src/lib.rs
Normal file
5
crates/vectord/src/lib.rs
Normal file
@ -0,0 +1,5 @@
|
||||
pub mod chunker;
|
||||
pub mod store;
|
||||
pub mod search;
|
||||
pub mod rag;
|
||||
pub mod service;
|
||||
84
crates/vectord/src/rag.rs
Normal file
84
crates/vectord/src/rag.rs
Normal file
@ -0,0 +1,84 @@
|
||||
/// RAG pipeline: question → embed → search → retrieve → generate answer.
|
||||
|
||||
use object_store::ObjectStore;
|
||||
use std::sync::Arc;
|
||||
|
||||
use aibridge::client::{AiClient, EmbedRequest, GenerateRequest};
|
||||
use crate::search::{self, SearchResult};
|
||||
use crate::store;
|
||||
|
||||
/// Full RAG answer with provenance.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct RagResponse {
|
||||
pub answer: String,
|
||||
pub model: String,
|
||||
pub sources: Vec<SearchResult>,
|
||||
pub tokens_generated: Option<u64>,
|
||||
}
|
||||
|
||||
/// Execute full RAG: embed question → search index → retrieve context → generate answer.
|
||||
pub async fn query(
|
||||
question: &str,
|
||||
index_name: &str,
|
||||
top_k: usize,
|
||||
object_store: &Arc<dyn ObjectStore>,
|
||||
ai_client: &AiClient,
|
||||
) -> Result<RagResponse, String> {
|
||||
// 1. Embed the question
|
||||
tracing::info!("RAG: embedding question");
|
||||
let embed_resp = ai_client.embed(EmbedRequest {
|
||||
texts: vec![question.to_string()],
|
||||
model: None,
|
||||
}).await?;
|
||||
|
||||
if embed_resp.embeddings.is_empty() {
|
||||
return Err("no embedding returned for question".into());
|
||||
}
|
||||
|
||||
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
|
||||
|
||||
// 2. Load index and search
|
||||
tracing::info!("RAG: searching index '{index_name}'");
|
||||
let embeddings = store::load_embeddings(object_store, index_name).await?;
|
||||
let results = search::search(&query_vec, &embeddings, top_k);
|
||||
|
||||
if results.is_empty() {
|
||||
return Ok(RagResponse {
|
||||
answer: "No relevant information found.".into(),
|
||||
model: String::new(),
|
||||
sources: vec![],
|
||||
tokens_generated: None,
|
||||
});
|
||||
}
|
||||
|
||||
// 3. Build context from retrieved chunks
|
||||
let context: String = results.iter().enumerate().map(|(i, r)| {
|
||||
format!("[{}] (source: {}, doc: {}) {}", i + 1, r.source, r.doc_id, r.chunk_text)
|
||||
}).collect::<Vec<_>>().join("\n\n");
|
||||
|
||||
// 4. Generate answer
|
||||
tracing::info!("RAG: generating answer from {} chunks", results.len());
|
||||
let prompt = format!(
|
||||
"You are a helpful assistant answering questions based on retrieved documents from a data system.\n\n\
|
||||
Use ONLY the following context to answer. If the context doesn't contain enough information, say so.\n\
|
||||
Cite sources by their number [1], [2], etc.\n\n\
|
||||
Context:\n{context}\n\n\
|
||||
Question: {question}\n\n\
|
||||
Answer:"
|
||||
);
|
||||
|
||||
let gen_resp = ai_client.generate(GenerateRequest {
|
||||
prompt,
|
||||
model: None,
|
||||
system: None,
|
||||
temperature: Some(0.2),
|
||||
max_tokens: Some(512),
|
||||
}).await?;
|
||||
|
||||
Ok(RagResponse {
|
||||
answer: gen_resp.text.trim().to_string(),
|
||||
model: gen_resp.model,
|
||||
sources: results,
|
||||
tokens_generated: gen_resp.tokens_generated,
|
||||
})
|
||||
}
|
||||
107
crates/vectord/src/search.rs
Normal file
107
crates/vectord/src/search.rs
Normal file
@ -0,0 +1,107 @@
|
||||
/// Brute-force vector search with cosine similarity.
|
||||
/// Works well up to ~100K vectors. HNSW index would go here for larger scale.
|
||||
|
||||
use crate::store::StoredEmbedding;
|
||||
|
||||
/// A search result with score.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct SearchResult {
|
||||
pub source: String,
|
||||
pub doc_id: String,
|
||||
pub chunk_idx: u32,
|
||||
pub chunk_text: String,
|
||||
pub score: f32,
|
||||
}
|
||||
|
||||
/// Search embeddings by cosine similarity. Returns top_k results.
|
||||
pub fn search(
|
||||
query_vector: &[f32],
|
||||
embeddings: &[StoredEmbedding],
|
||||
top_k: usize,
|
||||
) -> Vec<SearchResult> {
|
||||
let query_norm = norm(query_vector);
|
||||
if query_norm == 0.0 {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let mut scored: Vec<SearchResult> = embeddings.iter().map(|emb| {
|
||||
let score = cosine_similarity(query_vector, &emb.vector, query_norm);
|
||||
SearchResult {
|
||||
source: emb.source.clone(),
|
||||
doc_id: emb.doc_id.clone(),
|
||||
chunk_idx: emb.chunk_idx,
|
||||
chunk_text: emb.chunk_text.clone(),
|
||||
score,
|
||||
}
|
||||
}).collect();
|
||||
|
||||
// Sort descending by score
|
||||
scored.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
|
||||
scored.truncate(top_k);
|
||||
scored
|
||||
}
|
||||
|
||||
fn cosine_similarity(a: &[f32], b: &[f32], a_norm: f32) -> f32 {
|
||||
if a.len() != b.len() {
|
||||
return 0.0;
|
||||
}
|
||||
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
|
||||
let b_norm = norm(b);
|
||||
if b_norm == 0.0 {
|
||||
return 0.0;
|
||||
}
|
||||
dot / (a_norm * b_norm)
|
||||
}
|
||||
|
||||
fn norm(v: &[f32]) -> f32 {
|
||||
v.iter().map(|x| x * x).sum::<f32>().sqrt()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn identical_vectors_score_1() {
|
||||
let v = vec![1.0, 2.0, 3.0];
|
||||
let emb = StoredEmbedding {
|
||||
source: "test".into(),
|
||||
doc_id: "1".into(),
|
||||
chunk_idx: 0,
|
||||
chunk_text: "hello".into(),
|
||||
vector: v.clone(),
|
||||
};
|
||||
let results = search(&v, &[emb], 1);
|
||||
assert!((results[0].score - 1.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn orthogonal_vectors_score_0() {
|
||||
let q = vec![1.0, 0.0];
|
||||
let emb = StoredEmbedding {
|
||||
source: "test".into(),
|
||||
doc_id: "1".into(),
|
||||
chunk_idx: 0,
|
||||
chunk_text: "hello".into(),
|
||||
vector: vec![0.0, 1.0],
|
||||
};
|
||||
let results = search(&q, &[emb], 1);
|
||||
assert!(results[0].score.abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_top_k() {
|
||||
let q = vec![1.0, 0.0, 0.0];
|
||||
let embs: Vec<StoredEmbedding> = (0..10).map(|i| StoredEmbedding {
|
||||
source: "test".into(),
|
||||
doc_id: format!("{i}"),
|
||||
chunk_idx: 0,
|
||||
chunk_text: format!("doc {i}"),
|
||||
vector: vec![1.0 - i as f32 * 0.1, i as f32 * 0.1, 0.0],
|
||||
}).collect();
|
||||
let results = search(&q, &embs, 3);
|
||||
assert_eq!(results.len(), 3);
|
||||
assert!(results[0].score >= results[1].score);
|
||||
assert!(results[1].score >= results[2].score);
|
||||
}
|
||||
}
|
||||
175
crates/vectord/src/service.rs
Normal file
175
crates/vectord/src/service.rs
Normal file
@ -0,0 +1,175 @@
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::State,
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
routing::{get, post},
|
||||
};
|
||||
use object_store::ObjectStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use aibridge::client::{AiClient, EmbedRequest};
|
||||
use crate::{chunker, rag, search, store};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct VectorState {
|
||||
pub store: Arc<dyn ObjectStore>,
|
||||
pub ai_client: AiClient,
|
||||
}
|
||||
|
||||
pub fn router(state: VectorState) -> Router {
|
||||
Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/index", post(create_index))
|
||||
.route("/search", post(search_index))
|
||||
.route("/rag", post(rag_query))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
async fn health() -> &'static str {
|
||||
"vectord ok"
|
||||
}
|
||||
|
||||
// --- Index creation: chunk text → embed → store ---
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CreateIndexRequest {
|
||||
/// Name for this vector index
|
||||
index_name: String,
|
||||
/// Source identifier
|
||||
source: String,
|
||||
/// List of documents to index
|
||||
documents: Vec<DocInput>,
|
||||
/// Chunk size in characters (default 500)
|
||||
chunk_size: Option<usize>,
|
||||
/// Overlap in characters (default 50)
|
||||
overlap: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct DocInput {
|
||||
id: String,
|
||||
text: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CreateIndexResponse {
|
||||
index_name: String,
|
||||
documents: usize,
|
||||
chunks: usize,
|
||||
storage_key: String,
|
||||
}
|
||||
|
||||
async fn create_index(
|
||||
State(state): State<VectorState>,
|
||||
Json(req): Json<CreateIndexRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let chunk_size = req.chunk_size.unwrap_or(500);
|
||||
let overlap = req.overlap.unwrap_or(50);
|
||||
|
||||
tracing::info!("creating vector index '{}' from {} documents", req.index_name, req.documents.len());
|
||||
|
||||
// 1. Chunk all documents
|
||||
let doc_ids: Vec<String> = req.documents.iter().map(|d| d.id.clone()).collect();
|
||||
let texts: Vec<String> = req.documents.iter().map(|d| d.text.clone()).collect();
|
||||
let chunks = chunker::chunk_column(&req.source, &doc_ids, &texts, chunk_size, overlap);
|
||||
|
||||
if chunks.is_empty() {
|
||||
return Err((StatusCode::BAD_REQUEST, "no text to index".to_string()));
|
||||
}
|
||||
|
||||
tracing::info!("{} documents → {} chunks", req.documents.len(), chunks.len());
|
||||
|
||||
// 2. Embed all chunks (batch to avoid timeout)
|
||||
let batch_size = 32;
|
||||
let mut all_vectors: Vec<Vec<f64>> = Vec::new();
|
||||
|
||||
for batch in chunks.chunks(batch_size) {
|
||||
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
|
||||
let embed_resp = state.ai_client.embed(EmbedRequest {
|
||||
texts,
|
||||
model: None,
|
||||
}).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?;
|
||||
all_vectors.extend(embed_resp.embeddings);
|
||||
}
|
||||
|
||||
// 3. Store
|
||||
let key = store::store_embeddings(&state.store, &req.index_name, &chunks, &all_vectors)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||
|
||||
Ok((StatusCode::CREATED, Json(CreateIndexResponse {
|
||||
index_name: req.index_name,
|
||||
documents: req.documents.len(),
|
||||
chunks: chunks.len(),
|
||||
storage_key: key,
|
||||
})))
|
||||
}
|
||||
|
||||
// --- Search ---
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SearchRequest {
|
||||
index_name: String,
|
||||
query: String,
|
||||
top_k: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SearchResponse {
|
||||
results: Vec<search::SearchResult>,
|
||||
query: String,
|
||||
}
|
||||
|
||||
async fn search_index(
|
||||
State(state): State<VectorState>,
|
||||
Json(req): Json<SearchRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let top_k = req.top_k.unwrap_or(5);
|
||||
|
||||
// Embed query
|
||||
let embed_resp = state.ai_client.embed(EmbedRequest {
|
||||
texts: vec![req.query.clone()],
|
||||
model: None,
|
||||
}).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?;
|
||||
|
||||
if embed_resp.embeddings.is_empty() {
|
||||
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".to_string()));
|
||||
}
|
||||
|
||||
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
|
||||
|
||||
// Load index and search
|
||||
let embeddings = store::load_embeddings(&state.store, &req.index_name)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?;
|
||||
|
||||
let results = search::search(&query_vec, &embeddings, top_k);
|
||||
|
||||
Ok(Json(SearchResponse {
|
||||
results,
|
||||
query: req.query,
|
||||
}))
|
||||
}
|
||||
|
||||
// --- RAG ---
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct RagRequest {
|
||||
index_name: String,
|
||||
question: String,
|
||||
top_k: Option<usize>,
|
||||
}
|
||||
|
||||
async fn rag_query(
|
||||
State(state): State<VectorState>,
|
||||
Json(req): Json<RagRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let top_k = req.top_k.unwrap_or(5);
|
||||
|
||||
match rag::query(&req.question, &req.index_name, top_k, &state.store, &state.ai_client).await {
|
||||
Ok(resp) => Ok(Json(resp)),
|
||||
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||
}
|
||||
}
|
||||
116
crates/vectord/src/store.rs
Normal file
116
crates/vectord/src/store.rs
Normal file
@ -0,0 +1,116 @@
|
||||
/// Vector storage as Parquet files.
|
||||
/// Each embedding index is stored as: source, doc_id, chunk_idx, chunk_text, vector (binary blob).
|
||||
/// Vectors are stored as raw f32 bytes for compact storage and fast loading.
|
||||
|
||||
use arrow::array::{ArrayRef, BinaryArray, Float32Array, Int32Array, RecordBatch, StringArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema};
|
||||
use bytes::Bytes;
|
||||
use object_store::ObjectStore;
|
||||
use std::sync::Arc;
|
||||
|
||||
use storaged::ops;
|
||||
|
||||
use crate::chunker::TextChunk;
|
||||
|
||||
/// A stored embedding — chunk text + its vector.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoredEmbedding {
|
||||
pub source: String,
|
||||
pub doc_id: String,
|
||||
pub chunk_idx: u32,
|
||||
pub chunk_text: String,
|
||||
pub vector: Vec<f32>,
|
||||
}
|
||||
|
||||
/// Store embeddings as a Parquet file in object storage.
|
||||
pub async fn store_embeddings(
|
||||
store: &Arc<dyn ObjectStore>,
|
||||
index_name: &str,
|
||||
chunks: &[TextChunk],
|
||||
vectors: &[Vec<f64>], // from embedding API (f64), we store as f32
|
||||
) -> Result<String, String> {
|
||||
if chunks.len() != vectors.len() {
|
||||
return Err(format!("chunk count ({}) != vector count ({})", chunks.len(), vectors.len()));
|
||||
}
|
||||
|
||||
let n = chunks.len();
|
||||
let sources: Vec<&str> = chunks.iter().map(|c| c.source.as_str()).collect();
|
||||
let doc_ids: Vec<&str> = chunks.iter().map(|c| c.doc_id.as_str()).collect();
|
||||
let chunk_idxs: Vec<i32> = chunks.iter().map(|c| c.chunk_idx as i32).collect();
|
||||
let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
|
||||
|
||||
// Store vectors as raw f32 bytes (compact binary blob)
|
||||
let vector_bytes: Vec<Vec<u8>> = vectors.iter().map(|v| {
|
||||
v.iter().map(|&x| x as f32).flat_map(|f| f.to_le_bytes()).collect()
|
||||
}).collect();
|
||||
let vector_refs: Vec<&[u8]> = vector_bytes.iter().map(|v| v.as_slice()).collect();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("source", DataType::Utf8, false),
|
||||
Field::new("doc_id", DataType::Utf8, false),
|
||||
Field::new("chunk_idx", DataType::Int32, false),
|
||||
Field::new("chunk_text", DataType::Utf8, false),
|
||||
Field::new("vector", DataType::Binary, false),
|
||||
]));
|
||||
|
||||
let arrays: Vec<ArrayRef> = vec![
|
||||
Arc::new(StringArray::from(sources)),
|
||||
Arc::new(StringArray::from(doc_ids)),
|
||||
Arc::new(Int32Array::from(chunk_idxs)),
|
||||
Arc::new(StringArray::from(texts)),
|
||||
Arc::new(BinaryArray::from(vector_refs)),
|
||||
];
|
||||
|
||||
let batch = RecordBatch::try_new(schema, arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
let parquet = shared::arrow_helpers::record_batch_to_parquet(&batch)?;
|
||||
let key = format!("vectors/{index_name}.parquet");
|
||||
ops::put(store, &key, parquet).await?;
|
||||
|
||||
tracing::info!("stored {n} embeddings in {key}");
|
||||
Ok(key)
|
||||
}
|
||||
|
||||
/// Load all embeddings from a vector index file.
|
||||
pub async fn load_embeddings(
|
||||
store: &Arc<dyn ObjectStore>,
|
||||
index_name: &str,
|
||||
) -> Result<Vec<StoredEmbedding>, String> {
|
||||
let key = format!("vectors/{index_name}.parquet");
|
||||
let data = ops::get(store, &key).await?;
|
||||
|
||||
let (_, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)?;
|
||||
|
||||
let mut embeddings = Vec::new();
|
||||
for batch in &batches {
|
||||
let sources = batch.column(0).as_any().downcast_ref::<StringArray>()
|
||||
.ok_or("source column not string")?;
|
||||
let doc_ids = batch.column(1).as_any().downcast_ref::<StringArray>()
|
||||
.ok_or("doc_id column not string")?;
|
||||
let chunk_idxs = batch.column(2).as_any().downcast_ref::<Int32Array>()
|
||||
.ok_or("chunk_idx column not int")?;
|
||||
let texts = batch.column(3).as_any().downcast_ref::<StringArray>()
|
||||
.ok_or("chunk_text column not string")?;
|
||||
let vectors = batch.column(4).as_any().downcast_ref::<BinaryArray>()
|
||||
.ok_or("vector column not binary")?;
|
||||
|
||||
for i in 0..batch.num_rows() {
|
||||
let vec_bytes = vectors.value(i);
|
||||
let vector: Vec<f32> = vec_bytes.chunks_exact(4)
|
||||
.map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]]))
|
||||
.collect();
|
||||
|
||||
embeddings.push(StoredEmbedding {
|
||||
source: sources.value(i).to_string(),
|
||||
doc_id: doc_ids.value(i).to_string(),
|
||||
chunk_idx: chunk_idxs.value(i) as u32,
|
||||
chunk_text: texts.value(i).to_string(),
|
||||
vector,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("loaded {} embeddings from {key}", embeddings.len());
|
||||
Ok(embeddings)
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "0927b27a-80a9-4790-a34f-bda7ff176aac",
|
||||
"name": "job_orders",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/job_orders.parquet",
|
||||
"size_bytes": 225889,
|
||||
"created_at": "2026-03-27T13:11:41.384341257Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:41.384344032Z",
|
||||
"updated_at": "2026-03-27T13:11:41.384344032Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "0bf1eb1f-b182-4025-9b44-b8553e678bcf",
|
||||
"name": "timesheets",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/timesheets.parquet",
|
||||
"size_bytes": 2458229,
|
||||
"created_at": "2026-03-27T13:11:42.084209718Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:42.084217486Z",
|
||||
"updated_at": "2026-03-27T13:11:42.084217486Z"
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
{
|
||||
"id": "1ca61945-d151-490b-81fd-2ca0397b68fa",
|
||||
"name": "sms_messages",
|
||||
"schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/sms_messages.parquet",
|
||||
"size_bytes": 2018,
|
||||
"created_at": "2026-03-27T13:07:14.253881797Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:07:14.253886027Z",
|
||||
"updated_at": "2026-03-27T13:07:14.253886027Z"
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
{
|
||||
"id": "478072c3-0c95-46a2-9193-f4b3ac4085ab",
|
||||
"name": "test_ingest",
|
||||
"schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/test_ingest.parquet",
|
||||
"size_bytes": 3129,
|
||||
"created_at": "2026-03-27T13:06:57.437484309Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:06:57.437488259Z",
|
||||
"updated_at": "2026-03-27T13:06:57.437488259Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "4be87c74-10b4-463c-b69d-f20c9cd18ed7",
|
||||
"name": "candidates",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/candidates.parquet",
|
||||
"size_bytes": 2003395,
|
||||
"created_at": "2026-03-27T13:11:41.341589905Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:41.341599187Z",
|
||||
"updated_at": "2026-03-27T13:11:41.341599187Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "75bb6855-488b-4300-89c2-970871bd99cc",
|
||||
"name": "email_log",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/email_log.parquet",
|
||||
"size_bytes": 1873775,
|
||||
"created_at": "2026-03-27T13:11:42.757205427Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:42.757211105Z",
|
||||
"updated_at": "2026-03-27T13:11:42.757211105Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "ad393eee-ba0c-4338-9a8b-236bba3816ac",
|
||||
"name": "placements",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/placements.parquet",
|
||||
"size_bytes": 217395,
|
||||
"created_at": "2026-03-27T13:11:41.433628136Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:41.433633927Z",
|
||||
"updated_at": "2026-03-27T13:11:41.433633927Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "b334b1eb-d7a2-473f-a7fa-017b17de74bd",
|
||||
"name": "clients",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/clients.parquet",
|
||||
"size_bytes": 34228,
|
||||
"created_at": "2026-03-27T13:11:41.350247882Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:41.350250705Z",
|
||||
"updated_at": "2026-03-27T13:11:41.350250705Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "e015f0e2-51e4-4301-855d-76c54992c5b9",
|
||||
"name": "call_log",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/call_log.parquet",
|
||||
"size_bytes": 3276693,
|
||||
"created_at": "2026-03-27T13:11:42.483220340Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:11:42.483225870Z",
|
||||
"updated_at": "2026-03-27T13:11:42.483225870Z"
|
||||
}
|
||||
BIN
data/datasets/call_log.parquet
Normal file
BIN
data/datasets/call_log.parquet
Normal file
Binary file not shown.
BIN
data/datasets/candidates.parquet
Normal file
BIN
data/datasets/candidates.parquet
Normal file
Binary file not shown.
BIN
data/datasets/clients.parquet
Normal file
BIN
data/datasets/clients.parquet
Normal file
Binary file not shown.
BIN
data/datasets/email_log.parquet
Normal file
BIN
data/datasets/email_log.parquet
Normal file
Binary file not shown.
BIN
data/datasets/job_orders.parquet
Normal file
BIN
data/datasets/job_orders.parquet
Normal file
Binary file not shown.
BIN
data/datasets/placements.parquet
Normal file
BIN
data/datasets/placements.parquet
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
data/datasets/timesheets.parquet
Normal file
BIN
data/datasets/timesheets.parquet
Normal file
Binary file not shown.
BIN
data/vectors/candidate_resumes.parquet
Normal file
BIN
data/vectors/candidate_resumes.parquet
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user