//! Phase C: Decoupled embedding refresh. //! //! When a dataset's row-level data changes, the vector index it feeds is //! stale. Historically we coupled ingest and embedding — writing new rows //! also re-embedded every row, which is fine at 500 rows but blows up at //! 100K+. The llms3.com architecture calls out "asynchronous vector //! refresh cycles independent from transactional mutations" as the right //! pattern. //! //! This module implements the refresh side. The ingest path marks //! embeddings stale (see catalogd::registry::mark_embeddings_stale); this //! code clears that staleness by embedding only rows whose `doc_id` isn't //! already in the existing vector index. //! //! Scope — MVP: //! - Reads the dataset's Parquet, extracts (doc_id, text) pairs from named //! columns //! - Loads existing embeddings via EmbeddingCache //! - Filters to rows whose doc_id is NOT in the existing set //! - Chunks, embeds via Ollama, appends to the index parquet //! - Clears stale flag on success //! //! Not in MVP: //! - UPDATE semantics (same doc_id, new content) — would need content-hash //! comparison per row //! - Large-scale resilience (batching with checkpoints like the //! supervisor) — MVP does it inline //! - Lance backend — ADR-019 makes this straightforward later; MVP stays //! on Parquet sidecar indexes use std::collections::HashSet; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; use arrow::array::{Array, Int32Array, Int64Array, StringArray}; use catalogd::registry::Registry; use object_store::ObjectStore; use crate::chunker::{self, TextChunk}; use crate::embedding_cache::EmbeddingCache; use crate::index_registry::IndexRegistry; use crate::store::{self, StoredEmbedding}; #[derive(Debug, Clone, serde::Deserialize)] pub struct RefreshRequest { pub index_name: String, /// Column with the document id (row identity). pub id_column: String, /// Column with the text to embed. pub text_column: String, /// Chunk size (chars). Defaults to 500. #[serde(default)] pub chunk_size: Option, /// Overlap (chars). Defaults to 50. #[serde(default)] pub overlap: Option, } #[derive(Debug, Clone, serde::Serialize)] pub struct RefreshResult { pub index_name: String, pub dataset_name: String, pub pre_existing_docs: usize, pub dataset_docs: usize, pub new_docs_embedded: usize, pub new_chunks_embedded: usize, pub total_embeddings_after: usize, pub duration_secs: f32, pub stale_cleared: bool, } /// Full refresh pipeline. Takes dataset_name as URL param, body has the /// column selectors. pub async fn refresh_index( dataset_name: &str, req: &RefreshRequest, store_: &Arc, registry: &Registry, ai_client: &AiClient, embedding_cache: &EmbeddingCache, index_registry: &IndexRegistry, ) -> Result { let t0 = std::time::Instant::now(); // 1. Find the dataset manifest, pull its object storage key let manifest = registry .get_by_name(dataset_name) .await .ok_or_else(|| format!("dataset not found: {dataset_name}"))?; if manifest.objects.is_empty() { return Err(format!("dataset '{dataset_name}' has no object references")); } // 2. Read dataset rows — extract (doc_id, text) pairs from the // specified columns. Multi-object datasets: concat across all. let mut doc_id_to_text: Vec<(String, String)> = Vec::new(); for obj in &manifest.objects { let data = storaged::ops::get(store_, &obj.key).await .map_err(|e| format!("read {}: {e}", obj.key))?; let (schema, batches) = shared::arrow_helpers::parquet_to_record_batches(&data) .map_err(|e| format!("parse {}: {e}", obj.key))?; let id_idx = schema.index_of(&req.id_column) .map_err(|_| format!("id column '{}' not in dataset schema", req.id_column))?; let text_idx = schema.index_of(&req.text_column) .map_err(|_| format!("text column '{}' not in dataset schema", req.text_column))?; for batch in &batches { let text_col = batch .column(text_idx) .as_any() .downcast_ref::() .ok_or_else(|| format!("text column '{}' is not Utf8", req.text_column))?; // Accept Utf8, Int32, or Int64 as id — that covers CSV-derived // and Postgres-imported schemas without forcing upstream casts. let id_reader: Box Option> = { let col = batch.column(id_idx); if let Some(s) = col.as_any().downcast_ref::() { let s = s.clone(); Box::new(move |row| if s.is_null(row) { None } else { Some(s.value(row).to_string()) }) } else if let Some(a) = col.as_any().downcast_ref::() { let a = a.clone(); Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) }) } else if let Some(a) = col.as_any().downcast_ref::() { let a = a.clone(); Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) }) } else { return Err(format!( "id column '{}' must be Utf8, Int32, or Int64 — got {}", req.id_column, col.data_type(), )); } }; for row in 0..batch.num_rows() { if text_col.is_null(row) { continue; } let Some(id) = id_reader(row) else { continue; }; let text = text_col.value(row).to_string(); if text.trim().is_empty() { continue; } doc_id_to_text.push((id, text)); } } } let dataset_docs = doc_id_to_text.len(); tracing::info!("refresh '{}': dataset has {dataset_docs} rows", dataset_name); // 3. Load existing embeddings (empty if no index yet) let existing: Vec = match embedding_cache.get_or_load(&req.index_name).await { Ok(arc) => arc.as_ref().clone(), Err(_) => Vec::new(), // first-time index build }; let pre_existing_docs: HashSet = existing .iter() .map(|e| e.doc_id.clone()) .collect(); let pre_existing_count = pre_existing_docs.len(); // 4. Delta — rows whose doc_id isn't already indexed let new_rows: Vec<(String, String)> = doc_id_to_text .into_iter() .filter(|(id, _)| !pre_existing_docs.contains(id)) .collect(); let new_docs = new_rows.len(); if new_docs == 0 { tracing::info!("refresh '{}': no new docs to embed", dataset_name); // Even on no-op, make sure index metadata is registered so // downstream discovery (profile activation, A/B search) sees // indexes built by earlier refresh calls that predated the // auto-register behavior. let _ = try_update_index_meta(index_registry, &req.index_name, existing.len()).await; registry.clear_embeddings_stale(dataset_name).await?; return Ok(RefreshResult { index_name: req.index_name.clone(), dataset_name: dataset_name.to_string(), pre_existing_docs: pre_existing_count, dataset_docs, new_docs_embedded: 0, new_chunks_embedded: 0, total_embeddings_after: existing.len(), duration_secs: t0.elapsed().as_secs_f32(), stale_cleared: true, }); } // 5. Chunk the new rows let chunk_size = req.chunk_size.unwrap_or(500); let overlap = req.overlap.unwrap_or(50); let doc_ids: Vec = new_rows.iter().map(|(id, _)| id.clone()).collect(); let texts: Vec = new_rows.iter().map(|(_, t)| t.clone()).collect(); let chunks: Vec = chunker::chunk_column( dataset_name, &doc_ids, &texts, chunk_size, overlap, ); let new_chunks = chunks.len(); tracing::info!("refresh '{}': {} new docs -> {} chunks", dataset_name, new_docs, new_chunks); // 6. Embed via Ollama (batched) let batch_size = 32; let mut all_vectors: Vec> = Vec::with_capacity(new_chunks); for batch in chunks.chunks(batch_size) { let batch_texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); let resp = ai_client .embed(EmbedRequest { texts: batch_texts, model: None }) .await .map_err(|e| format!("embed: {e}"))?; all_vectors.extend(resp.embeddings); } // 7. Combine existing + new and write back as a single parquet // (MVP — append-as-rewrite. ADR-019 points to Lance for true native // append; for the Parquet sidecar we rewrite at refresh time.) let mut new_stored: Vec = existing.clone(); for (chunk, vector) in chunks.into_iter().zip(all_vectors.iter()) { new_stored.push(StoredEmbedding { source: chunk.source, doc_id: chunk.doc_id, chunk_idx: chunk.chunk_idx, chunk_text: chunk.text, vector: vector.iter().map(|&x| x as f32).collect(), }); } // We have to reconstruct chunks + vectors for store_embeddings — but // store_embeddings takes &[TextChunk] and vectors separately. Convert // the combined StoredEmbedding back to that shape. let combined_chunks: Vec = new_stored .iter() .map(|e| TextChunk { source: e.source.clone(), doc_id: e.doc_id.clone(), chunk_idx: e.chunk_idx, text: e.chunk_text.clone(), }) .collect(); let combined_vectors: Vec> = new_stored .iter() .map(|e| e.vector.iter().map(|&f| f as f64).collect()) .collect(); let _key = store::store_embeddings( store_, &req.index_name, &combined_chunks, &combined_vectors, ).await?; // 8. Evict embedding cache — next read will pick up the new file let _ = embedding_cache.evict(&req.index_name).await; // 9. Update index registry metadata (row/chunk counts; others unchanged // so we don't disturb the existing entry if present) let _ = try_update_index_meta(index_registry, &req.index_name, new_stored.len()).await; // 10. Clear stale flag registry.clear_embeddings_stale(dataset_name).await?; let total = new_stored.len(); Ok(RefreshResult { index_name: req.index_name.clone(), dataset_name: dataset_name.to_string(), pre_existing_docs: pre_existing_count, dataset_docs, new_docs_embedded: new_docs, new_chunks_embedded: new_chunks, total_embeddings_after: total, duration_secs: t0.elapsed().as_secs_f32(), stale_cleared: true, }) } /// Best-effort refresh of index registry metadata. /// - If the index already exists, bump the chunk_count. /// - If it's brand new (first-time build via refresh), REGISTER it so /// downstream discovery (profile activation, A/B search) can find it. async fn try_update_index_meta( index_registry: &IndexRegistry, index_name: &str, chunk_count: usize, ) -> Result<(), String> { if let Some(mut meta) = index_registry.get(index_name).await { meta.chunk_count = chunk_count; return index_registry.register(meta).await; } // First-time registration — infer reasonable defaults. Convention: // index name `{source}_v1` or `{source}_vN` implies a source dataset // named by stripping the `_vN` suffix. Otherwise use the index name // as the source and let the caller patch later. let source = match index_name.rsplit_once('_') { Some((base, suffix)) if suffix.starts_with('v') && suffix[1..].chars().all(|c| c.is_ascii_digit()) => { base.to_string() } _ => index_name.to_string(), }; let meta = crate::index_registry::IndexMeta { index_name: index_name.to_string(), source, model_name: "nomic-embed-text".to_string(), model_version: "latest".to_string(), dimensions: 768, chunk_count, doc_count: chunk_count, chunk_size: 500, overlap: 50, storage_key: format!("vectors/{index_name}.parquet"), created_at: chrono::Utc::now(), build_time_secs: 0.0, chunks_per_sec: 0.0, bucket: "primary".to_string(), vector_backend: shared::types::VectorBackend::Parquet, id_prefix: None, last_used: None, build_signature: None, }; index_registry.register(meta).await }