Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.
## Phase 16.2 / 16.5 — autotune agent + ingest triggers
`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.
The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).
`[agent]` config section in lakehouse.toml; opt-in via enabled=true.
## Federation Layer 2 — runtime bucket lifecycle + per-index scoping
`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.
`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.
`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.
EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.
## Phase 17 — VRAM-aware two-profile gate
Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.
`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.
Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.
## MySQL streaming connector
`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.
POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).
Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.
## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)
`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.
`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).
Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence
Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
~35ms full-file scan, slower than the bench's 311us positional
take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly
## Known follow-ups not in this milestone
- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
{id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
(malformed `{prefix}/` parses as code fence) — pre-existing bug,
left for a focused fix
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
313 lines
12 KiB
Rust
313 lines
12 KiB
Rust
//! Phase C: Decoupled embedding refresh.
|
|
//!
|
|
//! When a dataset's row-level data changes, the vector index it feeds is
|
|
//! stale. Historically we coupled ingest and embedding — writing new rows
|
|
//! also re-embedded every row, which is fine at 500 rows but blows up at
|
|
//! 100K+. The llms3.com architecture calls out "asynchronous vector
|
|
//! refresh cycles independent from transactional mutations" as the right
|
|
//! pattern.
|
|
//!
|
|
//! This module implements the refresh side. The ingest path marks
|
|
//! embeddings stale (see catalogd::registry::mark_embeddings_stale); this
|
|
//! code clears that staleness by embedding only rows whose `doc_id` isn't
|
|
//! already in the existing vector index.
|
|
//!
|
|
//! Scope — MVP:
|
|
//! - Reads the dataset's Parquet, extracts (doc_id, text) pairs from named
|
|
//! columns
|
|
//! - Loads existing embeddings via EmbeddingCache
|
|
//! - Filters to rows whose doc_id is NOT in the existing set
|
|
//! - Chunks, embeds via Ollama, appends to the index parquet
|
|
//! - Clears stale flag on success
|
|
//!
|
|
//! Not in MVP:
|
|
//! - UPDATE semantics (same doc_id, new content) — would need content-hash
|
|
//! comparison per row
|
|
//! - Large-scale resilience (batching with checkpoints like the
|
|
//! supervisor) — MVP does it inline
|
|
//! - Lance backend — ADR-019 makes this straightforward later; MVP stays
|
|
//! on Parquet sidecar indexes
|
|
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
|
|
use aibridge::client::{AiClient, EmbedRequest};
|
|
use arrow::array::{Array, Int32Array, Int64Array, StringArray};
|
|
use catalogd::registry::Registry;
|
|
use object_store::ObjectStore;
|
|
|
|
use crate::chunker::{self, TextChunk};
|
|
use crate::embedding_cache::EmbeddingCache;
|
|
use crate::index_registry::IndexRegistry;
|
|
use crate::store::{self, StoredEmbedding};
|
|
|
|
#[derive(Debug, Clone, serde::Deserialize)]
|
|
pub struct RefreshRequest {
|
|
pub index_name: String,
|
|
/// Column with the document id (row identity).
|
|
pub id_column: String,
|
|
/// Column with the text to embed.
|
|
pub text_column: String,
|
|
/// Chunk size (chars). Defaults to 500.
|
|
#[serde(default)]
|
|
pub chunk_size: Option<usize>,
|
|
/// Overlap (chars). Defaults to 50.
|
|
#[serde(default)]
|
|
pub overlap: Option<usize>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, serde::Serialize)]
|
|
pub struct RefreshResult {
|
|
pub index_name: String,
|
|
pub dataset_name: String,
|
|
pub pre_existing_docs: usize,
|
|
pub dataset_docs: usize,
|
|
pub new_docs_embedded: usize,
|
|
pub new_chunks_embedded: usize,
|
|
pub total_embeddings_after: usize,
|
|
pub duration_secs: f32,
|
|
pub stale_cleared: bool,
|
|
}
|
|
|
|
/// Full refresh pipeline. Takes dataset_name as URL param, body has the
|
|
/// column selectors.
|
|
pub async fn refresh_index(
|
|
dataset_name: &str,
|
|
req: &RefreshRequest,
|
|
store_: &Arc<dyn ObjectStore>,
|
|
registry: &Registry,
|
|
ai_client: &AiClient,
|
|
embedding_cache: &EmbeddingCache,
|
|
index_registry: &IndexRegistry,
|
|
) -> Result<RefreshResult, String> {
|
|
let t0 = std::time::Instant::now();
|
|
|
|
// 1. Find the dataset manifest, pull its object storage key
|
|
let manifest = registry
|
|
.get_by_name(dataset_name)
|
|
.await
|
|
.ok_or_else(|| format!("dataset not found: {dataset_name}"))?;
|
|
|
|
if manifest.objects.is_empty() {
|
|
return Err(format!("dataset '{dataset_name}' has no object references"));
|
|
}
|
|
|
|
// 2. Read dataset rows — extract (doc_id, text) pairs from the
|
|
// specified columns. Multi-object datasets: concat across all.
|
|
let mut doc_id_to_text: Vec<(String, String)> = Vec::new();
|
|
for obj in &manifest.objects {
|
|
let data = storaged::ops::get(store_, &obj.key).await
|
|
.map_err(|e| format!("read {}: {e}", obj.key))?;
|
|
let (schema, batches) = shared::arrow_helpers::parquet_to_record_batches(&data)
|
|
.map_err(|e| format!("parse {}: {e}", obj.key))?;
|
|
|
|
let id_idx = schema.index_of(&req.id_column)
|
|
.map_err(|_| format!("id column '{}' not in dataset schema", req.id_column))?;
|
|
let text_idx = schema.index_of(&req.text_column)
|
|
.map_err(|_| format!("text column '{}' not in dataset schema", req.text_column))?;
|
|
|
|
for batch in &batches {
|
|
let text_col = batch
|
|
.column(text_idx)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.ok_or_else(|| format!("text column '{}' is not Utf8", req.text_column))?;
|
|
|
|
// Accept Utf8, Int32, or Int64 as id — that covers CSV-derived
|
|
// and Postgres-imported schemas without forcing upstream casts.
|
|
let id_reader: Box<dyn Fn(usize) -> Option<String>> = {
|
|
let col = batch.column(id_idx);
|
|
if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
|
|
let s = s.clone();
|
|
Box::new(move |row| if s.is_null(row) { None } else { Some(s.value(row).to_string()) })
|
|
} else if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
|
|
let a = a.clone();
|
|
Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) })
|
|
} else if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
|
|
let a = a.clone();
|
|
Box::new(move |row| if a.is_null(row) { None } else { Some(a.value(row).to_string()) })
|
|
} else {
|
|
return Err(format!(
|
|
"id column '{}' must be Utf8, Int32, or Int64 — got {}",
|
|
req.id_column,
|
|
col.data_type(),
|
|
));
|
|
}
|
|
};
|
|
|
|
for row in 0..batch.num_rows() {
|
|
if text_col.is_null(row) { continue; }
|
|
let Some(id) = id_reader(row) else { continue; };
|
|
let text = text_col.value(row).to_string();
|
|
if text.trim().is_empty() { continue; }
|
|
doc_id_to_text.push((id, text));
|
|
}
|
|
}
|
|
}
|
|
let dataset_docs = doc_id_to_text.len();
|
|
tracing::info!("refresh '{}': dataset has {dataset_docs} rows", dataset_name);
|
|
|
|
// 3. Load existing embeddings (empty if no index yet)
|
|
let existing: Vec<StoredEmbedding> = match embedding_cache.get_or_load(&req.index_name).await {
|
|
Ok(arc) => arc.as_ref().clone(),
|
|
Err(_) => Vec::new(), // first-time index build
|
|
};
|
|
let pre_existing_docs: HashSet<String> = existing
|
|
.iter()
|
|
.map(|e| e.doc_id.clone())
|
|
.collect();
|
|
let pre_existing_count = pre_existing_docs.len();
|
|
|
|
// 4. Delta — rows whose doc_id isn't already indexed
|
|
let new_rows: Vec<(String, String)> = doc_id_to_text
|
|
.into_iter()
|
|
.filter(|(id, _)| !pre_existing_docs.contains(id))
|
|
.collect();
|
|
let new_docs = new_rows.len();
|
|
|
|
if new_docs == 0 {
|
|
tracing::info!("refresh '{}': no new docs to embed", dataset_name);
|
|
// Even on no-op, make sure index metadata is registered so
|
|
// downstream discovery (profile activation, A/B search) sees
|
|
// indexes built by earlier refresh calls that predated the
|
|
// auto-register behavior.
|
|
let _ = try_update_index_meta(index_registry, &req.index_name, existing.len()).await;
|
|
registry.clear_embeddings_stale(dataset_name).await?;
|
|
return Ok(RefreshResult {
|
|
index_name: req.index_name.clone(),
|
|
dataset_name: dataset_name.to_string(),
|
|
pre_existing_docs: pre_existing_count,
|
|
dataset_docs,
|
|
new_docs_embedded: 0,
|
|
new_chunks_embedded: 0,
|
|
total_embeddings_after: existing.len(),
|
|
duration_secs: t0.elapsed().as_secs_f32(),
|
|
stale_cleared: true,
|
|
});
|
|
}
|
|
|
|
// 5. Chunk the new rows
|
|
let chunk_size = req.chunk_size.unwrap_or(500);
|
|
let overlap = req.overlap.unwrap_or(50);
|
|
let doc_ids: Vec<String> = new_rows.iter().map(|(id, _)| id.clone()).collect();
|
|
let texts: Vec<String> = new_rows.iter().map(|(_, t)| t.clone()).collect();
|
|
let chunks: Vec<TextChunk> = chunker::chunk_column(
|
|
dataset_name, &doc_ids, &texts, chunk_size, overlap,
|
|
);
|
|
let new_chunks = chunks.len();
|
|
tracing::info!("refresh '{}': {} new docs -> {} chunks", dataset_name, new_docs, new_chunks);
|
|
|
|
// 6. Embed via Ollama (batched)
|
|
let batch_size = 32;
|
|
let mut all_vectors: Vec<Vec<f64>> = Vec::with_capacity(new_chunks);
|
|
for batch in chunks.chunks(batch_size) {
|
|
let batch_texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
|
|
let resp = ai_client
|
|
.embed(EmbedRequest { texts: batch_texts, model: None })
|
|
.await
|
|
.map_err(|e| format!("embed: {e}"))?;
|
|
all_vectors.extend(resp.embeddings);
|
|
}
|
|
|
|
// 7. Combine existing + new and write back as a single parquet
|
|
// (MVP — append-as-rewrite. ADR-019 points to Lance for true native
|
|
// append; for the Parquet sidecar we rewrite at refresh time.)
|
|
let mut new_stored: Vec<StoredEmbedding> = existing.clone();
|
|
for (chunk, vector) in chunks.into_iter().zip(all_vectors.iter()) {
|
|
new_stored.push(StoredEmbedding {
|
|
source: chunk.source,
|
|
doc_id: chunk.doc_id,
|
|
chunk_idx: chunk.chunk_idx,
|
|
chunk_text: chunk.text,
|
|
vector: vector.iter().map(|&x| x as f32).collect(),
|
|
});
|
|
}
|
|
|
|
// We have to reconstruct chunks + vectors for store_embeddings — but
|
|
// store_embeddings takes &[TextChunk] and vectors separately. Convert
|
|
// the combined StoredEmbedding back to that shape.
|
|
let combined_chunks: Vec<TextChunk> = new_stored
|
|
.iter()
|
|
.map(|e| TextChunk {
|
|
source: e.source.clone(),
|
|
doc_id: e.doc_id.clone(),
|
|
chunk_idx: e.chunk_idx,
|
|
text: e.chunk_text.clone(),
|
|
})
|
|
.collect();
|
|
let combined_vectors: Vec<Vec<f64>> = new_stored
|
|
.iter()
|
|
.map(|e| e.vector.iter().map(|&f| f as f64).collect())
|
|
.collect();
|
|
|
|
let _key = store::store_embeddings(
|
|
store_, &req.index_name, &combined_chunks, &combined_vectors,
|
|
).await?;
|
|
|
|
// 8. Evict embedding cache — next read will pick up the new file
|
|
let _ = embedding_cache.evict(&req.index_name).await;
|
|
|
|
// 9. Update index registry metadata (row/chunk counts; others unchanged
|
|
// so we don't disturb the existing entry if present)
|
|
let _ = try_update_index_meta(index_registry, &req.index_name, new_stored.len()).await;
|
|
|
|
// 10. Clear stale flag
|
|
registry.clear_embeddings_stale(dataset_name).await?;
|
|
|
|
let total = new_stored.len();
|
|
Ok(RefreshResult {
|
|
index_name: req.index_name.clone(),
|
|
dataset_name: dataset_name.to_string(),
|
|
pre_existing_docs: pre_existing_count,
|
|
dataset_docs,
|
|
new_docs_embedded: new_docs,
|
|
new_chunks_embedded: new_chunks,
|
|
total_embeddings_after: total,
|
|
duration_secs: t0.elapsed().as_secs_f32(),
|
|
stale_cleared: true,
|
|
})
|
|
}
|
|
|
|
/// Best-effort refresh of index registry metadata.
|
|
/// - If the index already exists, bump the chunk_count.
|
|
/// - If it's brand new (first-time build via refresh), REGISTER it so
|
|
/// downstream discovery (profile activation, A/B search) can find it.
|
|
async fn try_update_index_meta(
|
|
index_registry: &IndexRegistry,
|
|
index_name: &str,
|
|
chunk_count: usize,
|
|
) -> Result<(), String> {
|
|
if let Some(mut meta) = index_registry.get(index_name).await {
|
|
meta.chunk_count = chunk_count;
|
|
return index_registry.register(meta).await;
|
|
}
|
|
// First-time registration — infer reasonable defaults. Convention:
|
|
// index name `{source}_v1` or `{source}_vN` implies a source dataset
|
|
// named by stripping the `_vN` suffix. Otherwise use the index name
|
|
// as the source and let the caller patch later.
|
|
let source = match index_name.rsplit_once('_') {
|
|
Some((base, suffix)) if suffix.starts_with('v') && suffix[1..].chars().all(|c| c.is_ascii_digit()) => {
|
|
base.to_string()
|
|
}
|
|
_ => index_name.to_string(),
|
|
};
|
|
let meta = crate::index_registry::IndexMeta {
|
|
index_name: index_name.to_string(),
|
|
source,
|
|
model_name: "nomic-embed-text".to_string(),
|
|
model_version: "latest".to_string(),
|
|
dimensions: 768,
|
|
chunk_count,
|
|
doc_count: chunk_count,
|
|
chunk_size: 500,
|
|
overlap: 50,
|
|
storage_key: format!("vectors/{index_name}.parquet"),
|
|
created_at: chrono::Utc::now(),
|
|
build_time_secs: 0.0,
|
|
chunks_per_sec: 0.0,
|
|
bucket: "primary".to_string(),
|
|
vector_backend: shared::types::VectorBackend::Parquet,
|
|
};
|
|
index_registry.register(meta).await
|
|
}
|