Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.
## Phase 16.2 / 16.5 — autotune agent + ingest triggers
`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.
The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).
`[agent]` config section in lakehouse.toml; opt-in via enabled=true.
## Federation Layer 2 — runtime bucket lifecycle + per-index scoping
`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.
`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.
`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.
EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.
## Phase 17 — VRAM-aware two-profile gate
Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.
`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.
Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.
## MySQL streaming connector
`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.
POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).
Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.
## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)
`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.
`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).
Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence
Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
~35ms full-file scan, slower than the bench's 311us positional
take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly
## Known follow-ups not in this milestone
- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
{id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
(malformed `{prefix}/` parses as code fence) — pre-existing bug,
left for a focused fix
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
221 lines
8.6 KiB
Rust
221 lines
8.6 KiB
Rust
//! Phase 16: Promoted HNSW configs — the "active generation" pointer.
|
|
//!
|
|
//! An index's HNSW config used at build time normally defaults to the
|
|
//! system-wide default (`HnswConfig::default()`). An operator or the
|
|
//! autotune agent can *promote* a specific trial's config — subsequent
|
|
//! HNSW builds against that index use the promoted config instead.
|
|
//!
|
|
//! Every promotion is history-tracked so `rollback` can revert. The
|
|
//! history file lives at `primary://_hnsw_promotions/{index_name}.json`
|
|
//! and is small (< few KB) so we rewrite it on every promotion rather
|
|
//! than append-log.
|
|
//!
|
|
//! Not included here:
|
|
//! - Atomic graph rebuild on promote — promotion only updates the sticky
|
|
//! default. Next activation (or search that triggers lazy build) picks
|
|
//! up the new config. That's "zero-downtime swap after build" which is
|
|
//! what ADR-019 actually claimed; an instant-swap requires a
|
|
//! pre-built graph pool which we don't have yet.
|
|
//! - Agent loop — lives in `vectord::autotune`.
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use storaged::ops;
|
|
use storaged::registry::BucketRegistry;
|
|
use tokio::sync::RwLock;
|
|
|
|
use crate::index_registry::IndexRegistry;
|
|
use crate::trial::HnswConfig;
|
|
|
|
const PROMOTION_PREFIX: &str = "_hnsw_promotions";
|
|
|
|
/// One promotion record. The `trial_id` is the origin of the config —
|
|
/// lets operators trace back "why was this config picked?" to the exact
|
|
/// trial in the trial journal.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct PromotionEntry {
|
|
pub config: HnswConfig,
|
|
pub trial_id: String,
|
|
pub promoted_at: DateTime<Utc>,
|
|
#[serde(default)]
|
|
pub promoted_by: String,
|
|
#[serde(default)]
|
|
pub note: Option<String>,
|
|
}
|
|
|
|
/// Serialized form of an index's promotion history.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct PromotionFile {
|
|
pub index_name: String,
|
|
pub current: Option<PromotionEntry>,
|
|
#[serde(default)]
|
|
pub history: Vec<PromotionEntry>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct PromotionRegistry {
|
|
buckets: Arc<BucketRegistry>,
|
|
index_registry: IndexRegistry,
|
|
cache: Arc<RwLock<HashMap<String, PromotionFile>>>,
|
|
}
|
|
|
|
impl PromotionRegistry {
|
|
pub fn new(buckets: Arc<BucketRegistry>, index_registry: IndexRegistry) -> Self {
|
|
Self {
|
|
buckets,
|
|
index_registry,
|
|
cache: Arc::new(RwLock::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
fn key(index_name: &str) -> String {
|
|
// Sanitize for object-store safety.
|
|
let safe: String = index_name
|
|
.chars()
|
|
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' })
|
|
.collect();
|
|
format!("{PROMOTION_PREFIX}/{safe}.json")
|
|
}
|
|
|
|
/// Resolve which bucket's store holds this index's promotion file.
|
|
/// Same rules as TrialJournal::bucket_for — follows IndexMeta.bucket,
|
|
/// defaults to primary when metadata is missing.
|
|
async fn store_for(&self, index_name: &str) -> Result<Arc<dyn object_store::ObjectStore>, String> {
|
|
let bucket = self.index_registry
|
|
.get(index_name)
|
|
.await
|
|
.map(|m| m.bucket)
|
|
.unwrap_or_else(|| "primary".to_string());
|
|
self.buckets.get(&bucket)
|
|
}
|
|
|
|
/// Load (and cache) the promotion file for an index.
|
|
pub async fn load(&self, index_name: &str) -> Result<PromotionFile, String> {
|
|
if let Some(cached) = self.cache.read().await.get(index_name) {
|
|
return Ok(cached.clone());
|
|
}
|
|
let store = self.store_for(index_name).await?;
|
|
let key = Self::key(index_name);
|
|
let file = match ops::get(&store, &key).await {
|
|
Ok(bytes) => serde_json::from_slice::<PromotionFile>(&bytes)
|
|
.map_err(|e| format!("parse promotion file: {e}"))?,
|
|
Err(_) => PromotionFile {
|
|
index_name: index_name.to_string(),
|
|
current: None,
|
|
history: Vec::new(),
|
|
},
|
|
};
|
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
|
Ok(file)
|
|
}
|
|
|
|
/// Promote a config to the active slot. Pushes the current promotion
|
|
/// (if any) onto the history stack. Persists before returning — the
|
|
/// config is durable by the time this call completes.
|
|
pub async fn promote(
|
|
&self,
|
|
index_name: &str,
|
|
entry: PromotionEntry,
|
|
) -> Result<PromotionFile, String> {
|
|
let mut file = self.load(index_name).await?;
|
|
if let Some(prior) = file.current.take() {
|
|
file.history.push(prior);
|
|
// Cap history to something sensible so this file doesn't grow
|
|
// unbounded. 50 entries = 50 promotions — way more than any
|
|
// sane workflow needs.
|
|
const HISTORY_CAP: usize = 50;
|
|
if file.history.len() > HISTORY_CAP {
|
|
let drop = file.history.len() - HISTORY_CAP;
|
|
file.history.drain(0..drop);
|
|
}
|
|
}
|
|
file.current = Some(entry);
|
|
file.index_name = index_name.to_string();
|
|
|
|
let store = self.store_for(index_name).await?;
|
|
let key = Self::key(index_name);
|
|
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
|
|
ops::put(&store, &key, json.into()).await?;
|
|
|
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
|
tracing::info!(
|
|
"promoted '{}' to config {:?} (trial={})",
|
|
index_name, file.current.as_ref().unwrap().config, file.current.as_ref().unwrap().trial_id,
|
|
);
|
|
Ok(file)
|
|
}
|
|
|
|
/// Pop the latest promotion back onto the current slot (if any
|
|
/// history exists). If current is set but history is empty, the
|
|
/// current promotion is cleared — the index falls back to defaults.
|
|
pub async fn rollback(&self, index_name: &str) -> Result<PromotionFile, String> {
|
|
let mut file = self.load(index_name).await?;
|
|
match file.history.pop() {
|
|
Some(prev) => {
|
|
file.current = Some(prev);
|
|
}
|
|
None => {
|
|
if file.current.is_none() {
|
|
return Err(format!("no promotion to rollback for '{index_name}'"));
|
|
}
|
|
file.current = None;
|
|
}
|
|
}
|
|
let store = self.store_for(index_name).await?;
|
|
let key = Self::key(index_name);
|
|
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
|
|
ops::put(&store, &key, json.into()).await?;
|
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
|
tracing::info!("rolled back promotion for '{}'", index_name);
|
|
Ok(file)
|
|
}
|
|
|
|
/// Get the currently-promoted config (if any). Callers use this to
|
|
/// pick the right HnswConfig at build time.
|
|
pub async fn get_current(&self, index_name: &str) -> Option<PromotionEntry> {
|
|
self.load(index_name).await.ok().and_then(|f| f.current)
|
|
}
|
|
|
|
/// Convenience: return the promoted config or the provided default.
|
|
pub async fn config_or(&self, index_name: &str, default: HnswConfig) -> HnswConfig {
|
|
match self.get_current(index_name).await {
|
|
Some(entry) => entry.config,
|
|
None => default,
|
|
}
|
|
}
|
|
|
|
/// List every index that has a promotion recorded (for operator UI).
|
|
///
|
|
/// Federation: scans EVERY registered bucket for promotion files.
|
|
/// Per-profile buckets each have their own `_hnsw_promotions/` so we
|
|
/// aggregate across them. Dedups by index_name — if the same index
|
|
/// somehow has promotion files in multiple buckets, the one from the
|
|
/// bucket recorded in IndexMeta wins.
|
|
pub async fn list_all(&self) -> Result<Vec<PromotionFile>, String> {
|
|
let bucket_infos = self.buckets.list().await;
|
|
let mut by_name: HashMap<String, PromotionFile> = HashMap::new();
|
|
|
|
for b in &bucket_infos {
|
|
let store = match self.buckets.get(&b.name) {
|
|
Ok(s) => s,
|
|
Err(_) => continue,
|
|
};
|
|
let keys = ops::list(&store, Some(&format!("{PROMOTION_PREFIX}/")))
|
|
.await.unwrap_or_default();
|
|
for key in keys {
|
|
if !key.ends_with(".json") { continue; }
|
|
let bytes = match ops::get(&store, &key).await {
|
|
Ok(b) => b,
|
|
Err(_) => continue,
|
|
};
|
|
if let Ok(f) = serde_json::from_slice::<PromotionFile>(&bytes) {
|
|
by_name.insert(f.index_name.clone(), f);
|
|
}
|
|
}
|
|
}
|
|
Ok(by_name.into_values().collect())
|
|
}
|
|
}
|