diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index c0360a5..e94f19b 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -1,6 +1,6 @@ use shared::types::{ DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint, - ColumnMeta, Lineage, FreshnessContract, Sensitivity, + ColumnMeta, Lineage, FreshnessContract, RefreshPolicy, Sensitivity, }; use std::collections::HashMap; use std::sync::Arc; @@ -20,6 +20,8 @@ pub struct MetadataUpdate { pub lineage: Option, pub freshness: Option, pub row_count: Option, + // Phase C embedding freshness + pub embedding_refresh_policy: Option, } const MANIFEST_PREFIX: &str = "_catalog/manifests"; @@ -78,6 +80,9 @@ impl Registry { freshness: None, tags: vec![], row_count: None, + last_embedded_at: None, + embedding_stale_since: None, + embedding_refresh_policy: None, }; // Write-ahead: persist before in-memory update @@ -111,6 +116,7 @@ impl Registry { if let Some(lineage) = updates.lineage { manifest.lineage = Some(lineage); } if let Some(freshness) = updates.freshness { manifest.freshness = Some(freshness); } if let Some(count) = updates.row_count { manifest.row_count = Some(count); } + if let Some(policy) = updates.embedding_refresh_policy { manifest.embedding_refresh_policy = Some(policy); } manifest.updated_at = chrono::Utc::now(); // Persist @@ -242,6 +248,63 @@ impl Registry { (ok, err) } + /// Mark a dataset's embeddings as stale (row-level data has been written + /// since the last embedding refresh). Idempotent — setting stale when + /// already stale is a no-op. Only marks stale if the dataset has been + /// embedded before — a never-embedded dataset doesn't need a stale flag + /// (it just needs an initial index build). Called from the ingest path. + pub async fn mark_embeddings_stale(&self, name: &str) -> Result<(), String> { + let mut datasets = self.datasets.write().await; + let manifest = datasets + .values_mut() + .find(|d| d.name == name) + .ok_or_else(|| format!("dataset not found: {name}"))?; + + if manifest.last_embedded_at.is_none() { + return Ok(()); // never embedded -> no stale semantics yet + } + if manifest.embedding_stale_since.is_none() { + manifest.embedding_stale_since = Some(chrono::Utc::now()); + manifest.updated_at = chrono::Utc::now(); + + let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); + let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + tracing::info!("marked embeddings stale for dataset '{name}'"); + } + Ok(()) + } + + /// Clear the stale marker and set `last_embedded_at = now`. + /// Called by the embedding refresh pipeline once it finishes. + pub async fn clear_embeddings_stale(&self, name: &str) -> Result<(), String> { + let mut datasets = self.datasets.write().await; + let manifest = datasets + .values_mut() + .find(|d| d.name == name) + .ok_or_else(|| format!("dataset not found: {name}"))?; + + let now = chrono::Utc::now(); + manifest.embedding_stale_since = None; + manifest.last_embedded_at = Some(now); + manifest.updated_at = now; + + let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); + let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + Ok(()) + } + + /// List datasets whose `embedding_stale_since` is set — they need a refresh. + pub async fn stale_datasets(&self) -> Vec { + let datasets = self.datasets.read().await; + datasets + .values() + .filter(|d| d.embedding_stale_since.is_some()) + .cloned() + .collect() + } + /// Add objects to an existing dataset. pub async fn add_objects( &self, diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index b7e8633..256732d 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -98,6 +98,7 @@ async fn main() { hnsw_store: vectord::hnsw::HnswStore::new(), embedding_cache: vectord::embedding_cache::EmbeddingCache::new(store.clone()), trial_journal: vectord::trial::TrialJournal::new(store.clone()), + catalog: registry.clone(), } })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) diff --git a/crates/ingestd/src/pipeline.rs b/crates/ingestd/src/pipeline.rs index 65c056f..c28b518 100644 --- a/crates/ingestd/src/pipeline.rs +++ b/crates/ingestd/src/pipeline.rs @@ -147,6 +147,10 @@ pub async fn ingest_file( ..Default::default() }).await; + // Phase C: if this dataset already had embeddings, they're now stale. + // mark_embeddings_stale is a no-op for never-embedded datasets. + let _ = registry.mark_embeddings_stale(&safe_name).await; + Ok(IngestResult { dataset_name: safe_name, file_type: format!("{:?}", file_type), diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs index c1fa950..ddb946c 100644 --- a/crates/ingestd/src/service.rs +++ b/crates/ingestd/src/service.rs @@ -271,6 +271,10 @@ async fn ingest_db_stream( ..Default::default() }).await; + // Phase C: mark embeddings stale if the dataset already had a vector + // index. No-op for newly-created datasets. + let _ = state.registry.mark_embeddings_stale(&dataset_name).await; + Ok((StatusCode::CREATED, Json(serde_json::json!({ "dataset_name": dataset_name, "table": stream_result.table, diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index 5b734fd..de34945 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -116,4 +116,44 @@ pub struct DatasetManifest { /// Row count (updated on ingest/compact) #[serde(default)] pub row_count: Option, + + // --- Embedding freshness (Phase C) --- + + /// When the attached vector index was last refreshed. `None` means this + /// dataset has no vector index yet, or has never been embedded. + #[serde(default)] + pub last_embedded_at: Option>, + + /// When data was written that hasn't yet been embedded. `Some(t)` means + /// the vector index is out-of-date as of timestamp `t`. Cleared on refresh. + #[serde(default)] + pub embedding_stale_since: Option>, + + /// How this dataset wants stale embeddings handled. + #[serde(default)] + pub embedding_refresh_policy: Option, +} + +/// Controls what happens when new data is written to a dataset with an +/// attached vector index. +/// +/// - `Manual` (default): data writes set `embedding_stale_since`; nothing +/// embeds until an operator or agent calls `/vectors/refresh/{dataset}`. +/// - `OnAppend`: ingest fires a background refresh immediately after writing. +/// Suitable for datasets where vector freshness matters more than ingest +/// latency. +/// - `Scheduled(cron)`: a timer or external scheduler triggers refresh at +/// the named cadence. The scheduler itself is not in this ADR scope — +/// this just declares the intent so operators can see which policy a +/// dataset expects. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum RefreshPolicy { + Manual, + OnAppend, + Scheduled { cron: String }, +} + +impl Default for RefreshPolicy { + fn default() -> Self { Self::Manual } } diff --git a/crates/vectord/Cargo.toml b/crates/vectord/Cargo.toml index 25ec9f5..b58e98b 100644 --- a/crates/vectord/Cargo.toml +++ b/crates/vectord/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" shared = { path = "../shared" } storaged = { path = "../storaged" } aibridge = { path = "../aibridge" } +catalogd = { path = "../catalogd" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index caa7cf9..5f751d1 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -4,6 +4,7 @@ pub mod harness; pub mod hnsw; pub mod index_registry; pub mod jobs; +pub mod refresh; pub mod store; pub mod search; pub mod rag; diff --git a/crates/vectord/src/refresh.rs b/crates/vectord/src/refresh.rs new file mode 100644 index 0000000..970a395 --- /dev/null +++ b/crates/vectord/src/refresh.rs @@ -0,0 +1,279 @@ +//! Phase C: Decoupled embedding refresh. +//! +//! When a dataset's row-level data changes, the vector index it feeds is +//! stale. Historically we coupled ingest and embedding — writing new rows +//! also re-embedded every row, which is fine at 500 rows but blows up at +//! 100K+. The llms3.com architecture calls out "asynchronous vector +//! refresh cycles independent from transactional mutations" as the right +//! pattern. +//! +//! This module implements the refresh side. The ingest path marks +//! embeddings stale (see catalogd::registry::mark_embeddings_stale); this +//! code clears that staleness by embedding only rows whose `doc_id` isn't +//! already in the existing vector index. +//! +//! Scope — MVP: +//! - Reads the dataset's Parquet, extracts (doc_id, text) pairs from named +//! columns +//! - Loads existing embeddings via EmbeddingCache +//! - Filters to rows whose doc_id is NOT in the existing set +//! - Chunks, embeds via Ollama, appends to the index parquet +//! - Clears stale flag on success +//! +//! Not in MVP: +//! - UPDATE semantics (same doc_id, new content) — would need content-hash +//! comparison per row +//! - Large-scale resilience (batching with checkpoints like the +//! supervisor) — MVP does it inline +//! - Lance backend — ADR-019 makes this straightforward later; MVP stays +//! on Parquet sidecar indexes + +use std::collections::HashSet; +use std::sync::Arc; + +use aibridge::client::{AiClient, EmbedRequest}; +use arrow::array::{Array, Int32Array, Int64Array, StringArray}; +use catalogd::registry::Registry; +use object_store::ObjectStore; + +use crate::chunker::{self, TextChunk}; +use crate::embedding_cache::EmbeddingCache; +use crate::index_registry::IndexRegistry; +use crate::store::{self, StoredEmbedding}; + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct RefreshRequest { + pub index_name: String, + /// Column with the document id (row identity). + pub id_column: String, + /// Column with the text to embed. + pub text_column: String, + /// Chunk size (chars). Defaults to 500. + #[serde(default)] + pub chunk_size: Option, + /// Overlap (chars). Defaults to 50. + #[serde(default)] + pub overlap: Option, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct RefreshResult { + pub index_name: String, + pub dataset_name: String, + pub pre_existing_docs: usize, + pub dataset_docs: usize, + pub new_docs_embedded: usize, + pub new_chunks_embedded: usize, + pub total_embeddings_after: usize, + pub duration_secs: f32, + pub stale_cleared: bool, +} + +/// Full refresh pipeline. Takes dataset_name as URL param, body has the +/// column selectors. +pub async fn refresh_index( + dataset_name: &str, + req: &RefreshRequest, + store_: &Arc, + registry: &Registry, + ai_client: &AiClient, + embedding_cache: &EmbeddingCache, + index_registry: &IndexRegistry, +) -> Result { + let t0 = std::time::Instant::now(); + + // 1. Find the dataset manifest, pull its object storage key + let manifest = registry + .get_by_name(dataset_name) + .await + .ok_or_else(|| format!("dataset not found: {dataset_name}"))?; + + if manifest.objects.is_empty() { + return Err(format!("dataset '{dataset_name}' has no object references")); + } + + // 2. Read dataset rows — extract (doc_id, text) pairs from the + // specified columns. Multi-object datasets: concat across all. + let mut doc_id_to_text: Vec<(String, String)> = Vec::new(); + for obj in &manifest.objects { + let data = storaged::ops::get(store_, &obj.key).await + .map_err(|e| format!("read {}: {e}", obj.key))?; + let (schema, batches) = shared::arrow_helpers::parquet_to_record_batches(&data) + .map_err(|e| format!("parse {}: {e}", obj.key))?; + + let id_idx = schema.index_of(&req.id_column) + .map_err(|_| format!("id column '{}' not in dataset schema", req.id_column))?; + let text_idx = schema.index_of(&req.text_column) + .map_err(|_| format!("text column '{}' not in dataset schema", req.text_column))?; + + for batch in &batches { + let text_col = batch + .column(text_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| format!("text column '{}' is not Utf8", req.text_column))?; + + // Accept Utf8, Int32, or Int64 as id — that covers CSV-derived + // and Postgres-imported schemas without forcing upstream casts. + let id_reader: Box Option> = { + let col = batch.column(id_idx); + if let Some(s) = col.as_any().downcast_ref::() { + let s = s.clone(); + Box::new(move |row| if s.is_null(row) { None } else { Some(s.value(row).to_string()) }) + } else if let Some(a) = col.as_any().downcast_ref::() { + let a = a.clone(); + Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) }) + } else if let Some(a) = col.as_any().downcast_ref::() { + let a = a.clone(); + Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) }) + } else { + return Err(format!( + "id column '{}' must be Utf8, Int32, or Int64 — got {}", + req.id_column, + col.data_type(), + )); + } + }; + + for row in 0..batch.num_rows() { + if text_col.is_null(row) { continue; } + let Some(id) = id_reader(row) else { continue; }; + let text = text_col.value(row).to_string(); + if text.trim().is_empty() { continue; } + doc_id_to_text.push((id, text)); + } + } + } + let dataset_docs = doc_id_to_text.len(); + tracing::info!("refresh '{}': dataset has {dataset_docs} rows", dataset_name); + + // 3. Load existing embeddings (empty if no index yet) + let existing: Vec = match embedding_cache.get_or_load(&req.index_name).await { + Ok(arc) => arc.as_ref().clone(), + Err(_) => Vec::new(), // first-time index build + }; + let pre_existing_docs: HashSet = existing + .iter() + .map(|e| e.doc_id.clone()) + .collect(); + let pre_existing_count = pre_existing_docs.len(); + + // 4. Delta — rows whose doc_id isn't already indexed + let new_rows: Vec<(String, String)> = doc_id_to_text + .into_iter() + .filter(|(id, _)| !pre_existing_docs.contains(id)) + .collect(); + let new_docs = new_rows.len(); + + if new_docs == 0 { + tracing::info!("refresh '{}': no new docs to embed", dataset_name); + registry.clear_embeddings_stale(dataset_name).await?; + return Ok(RefreshResult { + index_name: req.index_name.clone(), + dataset_name: dataset_name.to_string(), + pre_existing_docs: pre_existing_count, + dataset_docs, + new_docs_embedded: 0, + new_chunks_embedded: 0, + total_embeddings_after: existing.len(), + duration_secs: t0.elapsed().as_secs_f32(), + stale_cleared: true, + }); + } + + // 5. Chunk the new rows + let chunk_size = req.chunk_size.unwrap_or(500); + let overlap = req.overlap.unwrap_or(50); + let doc_ids: Vec = new_rows.iter().map(|(id, _)| id.clone()).collect(); + let texts: Vec = new_rows.iter().map(|(_, t)| t.clone()).collect(); + let chunks: Vec = chunker::chunk_column( + dataset_name, &doc_ids, &texts, chunk_size, overlap, + ); + let new_chunks = chunks.len(); + tracing::info!("refresh '{}': {} new docs -> {} chunks", dataset_name, new_docs, new_chunks); + + // 6. Embed via Ollama (batched) + let batch_size = 32; + let mut all_vectors: Vec> = Vec::with_capacity(new_chunks); + for batch in chunks.chunks(batch_size) { + let batch_texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); + let resp = ai_client + .embed(EmbedRequest { texts: batch_texts, model: None }) + .await + .map_err(|e| format!("embed: {e}"))?; + all_vectors.extend(resp.embeddings); + } + + // 7. Combine existing + new and write back as a single parquet + // (MVP — append-as-rewrite. ADR-019 points to Lance for true native + // append; for the Parquet sidecar we rewrite at refresh time.) + let mut new_stored: Vec = existing.clone(); + for (chunk, vector) in chunks.into_iter().zip(all_vectors.iter()) { + new_stored.push(StoredEmbedding { + source: chunk.source, + doc_id: chunk.doc_id, + chunk_idx: chunk.chunk_idx, + chunk_text: chunk.text, + vector: vector.iter().map(|&x| x as f32).collect(), + }); + } + + // We have to reconstruct chunks + vectors for store_embeddings — but + // store_embeddings takes &[TextChunk] and vectors separately. Convert + // the combined StoredEmbedding back to that shape. + let combined_chunks: Vec = new_stored + .iter() + .map(|e| TextChunk { + source: e.source.clone(), + doc_id: e.doc_id.clone(), + chunk_idx: e.chunk_idx, + text: e.chunk_text.clone(), + }) + .collect(); + let combined_vectors: Vec> = new_stored + .iter() + .map(|e| e.vector.iter().map(|&f| f as f64).collect()) + .collect(); + + let _key = store::store_embeddings( + store_, &req.index_name, &combined_chunks, &combined_vectors, + ).await?; + + // 8. Evict embedding cache — next read will pick up the new file + let _ = embedding_cache.evict(&req.index_name).await; + + // 9. Update index registry metadata (row/chunk counts; others unchanged + // so we don't disturb the existing entry if present) + let _ = try_update_index_meta(index_registry, &req.index_name, new_stored.len()).await; + + // 10. Clear stale flag + registry.clear_embeddings_stale(dataset_name).await?; + + let total = new_stored.len(); + Ok(RefreshResult { + index_name: req.index_name.clone(), + dataset_name: dataset_name.to_string(), + pre_existing_docs: pre_existing_count, + dataset_docs, + new_docs_embedded: new_docs, + new_chunks_embedded: new_chunks, + total_embeddings_after: total, + duration_secs: t0.elapsed().as_secs_f32(), + stale_cleared: true, + }) +} + +/// Best-effort refresh of index registry metadata. If the index exists, +/// bump the chunk_count; if not, this is a no-op. +async fn try_update_index_meta( + index_registry: &IndexRegistry, + index_name: &str, + chunk_count: usize, +) -> Result<(), String> { + if let Some(mut meta) = index_registry.get(index_name).await { + meta.chunk_count = chunk_count; + index_registry.register(meta).await + } else { + Ok(()) + } +} diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 9ef6f87..0c7f85c 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -10,7 +10,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; -use crate::{chunker, embedding_cache, harness, hnsw, index_registry, jobs, rag, search, store, supervisor, trial}; +use catalogd::registry::Registry as CatalogRegistry; +use crate::{chunker, embedding_cache, harness, hnsw, index_registry, jobs, rag, refresh, search, store, supervisor, trial}; #[derive(Clone)] pub struct VectorState { @@ -21,6 +22,9 @@ pub struct VectorState { pub hnsw_store: hnsw::HnswStore, pub embedding_cache: embedding_cache::EmbeddingCache, pub trial_journal: trial::TrialJournal, + /// Catalog registry — needed by the Phase C refresh path to mark/clear + /// staleness and look up dataset manifests. + pub catalog: CatalogRegistry, } pub fn router(state: VectorState) -> Router { @@ -47,6 +51,9 @@ pub fn router(state: VectorState) -> Router { // Cache management .route("/hnsw/cache/stats", get(cache_stats)) .route("/hnsw/cache/{index_name}", axum::routing::delete(cache_evict)) + // Phase C: embedding refresh + .route("/refresh/{dataset_name}", post(refresh_dataset)) + .route("/stale", get(list_stale)) .with_state(state) } @@ -666,3 +673,61 @@ async fn cache_evict( let ok = state.embedding_cache.evict(&index_name).await; Json(serde_json::json!({ "evicted": ok, "index_name": index_name })) } + +// --- Phase C: embedding refresh --- +// +// Decouples "new row data arrived" from "re-embed everything." Ingest marks +// a dataset's embeddings stale (see catalogd::registry::mark_embeddings_stale); +// `/vectors/refresh/{dataset}` diffs existing embeddings against current +// rows, embeds only the new ones, appends to the index, and clears the +// stale flag. + +async fn refresh_dataset( + State(state): State, + Path(dataset_name): Path, + Json(req): Json, +) -> Result, (StatusCode, String)> { + tracing::info!( + "refresh requested for dataset '{}' -> index '{}'", + dataset_name, req.index_name, + ); + match refresh::refresh_index( + &dataset_name, + &req, + &state.store, + &state.catalog, + &state.ai_client, + &state.embedding_cache, + &state.index_registry, + ) + .await + { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Serialize)] +struct StaleEntry { + dataset_name: String, + last_embedded_at: Option, + stale_since: String, + refresh_policy: Option, +} + +async fn list_stale(State(state): State) -> impl IntoResponse { + let datasets = state.catalog.stale_datasets().await; + let entries: Vec = datasets + .into_iter() + .map(|d| StaleEntry { + dataset_name: d.name, + last_embedded_at: d.last_embedded_at.map(|t| t.to_rfc3339()), + stale_since: d + .embedding_stale_since + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + refresh_policy: d.embedding_refresh_policy, + }) + .collect(); + Json(entries) +} diff --git a/docs/PHASES.md b/docs/PHASES.md index 307da19..d7ae364 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -150,6 +150,18 @@ - `POST /ingest/db` endpoint: `{dsn, table, dataset_name?, batch_size?, order_by?, limit?}` → streams to Parquet, registers in catalog with PII detection + redacted-password lineage - Existing `POST /ingest/postgres/import` (structured config) preserved alongside - 4 DSN-parser unit tests + live end-to-end test against `knowledge_base.team_runs` (586 rows, 13 cols, 6 batches, 196ms) +- [x] Phase B: Lance storage evaluation — 2026-04-16 + - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack + - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard + - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. +- [x] Phase C: Decoupled embedding refresh — 2026-04-16 + - `DatasetManifest`: `last_embedded_at`, `embedding_stale_since`, `embedding_refresh_policy` (Manual | OnAppend | Scheduled) + - `Registry::mark_embeddings_stale` / `clear_embeddings_stale` / `stale_datasets` + - Ingest paths (CSV pipeline + Postgres streaming) auto-mark-stale when writing to an already-embedded dataset + - `vectord::refresh::refresh_index` — reads dataset, diffs doc_ids vs existing embeddings, embeds only new rows, writes combined index, clears stale + - `POST /vectors/refresh/{dataset}` + `GET /vectors/stale` + - Id columns accept `Utf8`, `Int32`, `Int64` + - End-to-end on threat_intel: initial 20-row embed 2.1s; re-ingest to 54 rows auto-marks stale; delta refresh embeds only 34 new in 970ms (6× faster than full re-embed); stale cleared - [ ] Database connector ingest (Postgres/MySQL) - [ ] PDF OCR (Tesseract) - [ ] Scheduled ingest (cron)