Phase 16: Hot-swap generations + autotune agent loop
Closes the self-iteration loop from the PRD reframe: an agent can
tune HNSW configs autonomously and the winner flows through to the
next profile activation without human intervention.
Three primitives:
1. PromotionRegistry (vectord::promotion)
- Per-index current + history at _hnsw_promotions/{index}.json
- promote(index, entry) atomically swaps current, pushes prior
onto history (capped at 50)
- rollback() pops history back onto current; clears current if
history exhausted
- config_or(index, default) — the read side used at build time,
returns promoted config if set else caller's default
- Full cache + persistence; writes are durable on return
2. Autotune (vectord::autotune)
- run_autotune(request, ...) — synchronous agent loop
- Default grid: 5 configs covering the practical range
(ec=20/40/80/80/160, es=30/30/30/60/30) with seed=42 for
reproducibility
- Every trial goes through the existing trial-journal pipeline
so autotune runs land alongside manual trials in the
"trials are data" log
- Winner: max recall first, then min p50 latency; must clear
min_recall gate (default 0.9) or no promotion happens
- Config bounds (ec ∈ [10,400], es ∈ [10,200]) reject absurd
values from the request's optional custom grid
- On winner: promote with note "autotune winner: recall=X p50=Y"
3. Wiring
- VectorState gains promotion_registry
- activate_profile now calls promotion_registry.config_or(...)
so newly-promoted configs are picked up on next activation —
the "hot-swap" is: autotune promotes -> profile activates ->
HNSW rebuilt with new config
- New endpoints:
POST /vectors/hnsw/promote/{index}/{trial_id}
?promoted_by=...¬e=...
POST /vectors/hnsw/rollback/{index}
GET /vectors/hnsw/promoted/{index}
POST /vectors/hnsw/autotune { index_name, harness,
min_recall?, grid? }
End-to-end verified on threat_intel_v1 (54 vectors):
- autogen harness 'threat_intel_smoke' (10 queries)
- POST /autotune -> 5 trials in 620ms, winner ec=20 es=30
recall=1.00 p50=64us auto-promoted
- Manual promote of ec=80 es=30 -> history depth 1
- Rollback -> back to ec=20 es=30 autotune winner
- Second rollback -> current cleared
- Re-promote + restart -> persistence verified
- Profile activation after promotion logged:
"building HNSW ef_construction=80 ef_search=30 seed=Some(42)"
proving the hot-swap loop is closed.
Deferred:
- Bayesian optimization (random-grid is fine at this config-space size)
- Append-triggered autotune (Phase 17.5 — refresh OnAppend policy
can schedule autotune after appending sufficient new rows)
- Concurrent autotune per index guard (JobTracker integration)
PRD invariants satisfied: invariant 8 (hot-swappable indexes) is now
real code — promote is atomic, rollback is always available, the
active generation is a persistent pointer not a runtime convention.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a293502265
commit
4d5c49090c
@ -100,6 +100,7 @@ async fn main() {
|
|||||||
embedding_cache: vectord::embedding_cache::EmbeddingCache::new(store.clone()),
|
embedding_cache: vectord::embedding_cache::EmbeddingCache::new(store.clone()),
|
||||||
trial_journal: vectord::trial::TrialJournal::new(store.clone()),
|
trial_journal: vectord::trial::TrialJournal::new(store.clone()),
|
||||||
catalog: registry.clone(),
|
catalog: registry.clone(),
|
||||||
|
promotion_registry: vectord::promotion::PromotionRegistry::new(store.clone()),
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
||||||
|
|||||||
328
crates/vectord/src/autotune.rs
Normal file
328
crates/vectord/src/autotune.rs
Normal file
@ -0,0 +1,328 @@
|
|||||||
|
//! Phase 16: Autotune — the agent loop for HNSW tuning.
|
||||||
|
//!
|
||||||
|
//! Given an index and a harness, run a grid of (ef_construction,
|
||||||
|
//! ef_search) trials, pick the Pareto winner, and promote it.
|
||||||
|
//!
|
||||||
|
//! This is deliberately NOT a sophisticated optimizer — random-grid
|
||||||
|
//! search is good enough when the config space is small (a dozen
|
||||||
|
//! combinations) and the work per trial is seconds, not hours. Think
|
||||||
|
//! of it as a "smart operator" rather than Bayesian optimization.
|
||||||
|
//!
|
||||||
|
//! Score: we want high recall and low p50 latency. The function below
|
||||||
|
//! picks the Pareto-dominating trial, breaking ties by prefer-lower-
|
||||||
|
//! build-time (cheaper to rebuild on data changes).
|
||||||
|
//!
|
||||||
|
//! Safeguards (PRD risk mitigation):
|
||||||
|
//! - Never promote a trial with recall < `min_recall` (default 0.9)
|
||||||
|
//! - Hard config bounds: ef_construction ∈ [10, 400], ef_search ∈ [10, 200]
|
||||||
|
//! - Concurrency cap: one autotune per index at a time (tracked by
|
||||||
|
//! JobTracker — caller is responsible for not firing in parallel
|
||||||
|
//! for the same index)
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use aibridge::client::AiClient;
|
||||||
|
use catalogd::registry::Registry as CatalogRegistry;
|
||||||
|
|
||||||
|
use crate::embedding_cache::EmbeddingCache;
|
||||||
|
use crate::harness;
|
||||||
|
use crate::hnsw::HnswStore;
|
||||||
|
use crate::index_registry::IndexRegistry;
|
||||||
|
use crate::jobs::JobTracker;
|
||||||
|
use crate::promotion::{PromotionEntry, PromotionRegistry};
|
||||||
|
use crate::trial::{HnswConfig, Trial, TrialJournal, TrialMetrics};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct AutotuneRequest {
|
||||||
|
pub index_name: String,
|
||||||
|
pub harness: String,
|
||||||
|
/// Minimum recall a trial must achieve to be eligible for promotion.
|
||||||
|
/// Default 0.9 — below that, the tuning is not producing useful
|
||||||
|
/// answers. Never promote a bad index.
|
||||||
|
#[serde(default = "default_min_recall")]
|
||||||
|
pub min_recall: f32,
|
||||||
|
/// Override the default config grid (rarely needed). When omitted,
|
||||||
|
/// a small curated grid runs.
|
||||||
|
#[serde(default)]
|
||||||
|
pub grid: Option<Vec<HnswConfig>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_min_recall() -> f32 { 0.9 }
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct AutotuneResult {
|
||||||
|
pub index_name: String,
|
||||||
|
pub trials_run: usize,
|
||||||
|
pub trials_eligible: usize,
|
||||||
|
pub winner: Option<WinnerReport>,
|
||||||
|
pub promoted: bool,
|
||||||
|
pub duration_secs: f32,
|
||||||
|
pub skipped: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct WinnerReport {
|
||||||
|
pub trial_id: String,
|
||||||
|
pub config: HnswConfig,
|
||||||
|
pub recall_at_k: f32,
|
||||||
|
pub search_latency_p50_us: f32,
|
||||||
|
pub build_time_secs: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Default exploration grid. Covers the practical range of settings
|
||||||
|
/// from "fast build, OK recall" to "thorough build, great recall".
|
||||||
|
fn default_grid() -> Vec<HnswConfig> {
|
||||||
|
vec![
|
||||||
|
HnswConfig { ef_construction: 20, ef_search: 30, seed: Some(42) },
|
||||||
|
HnswConfig { ef_construction: 40, ef_search: 30, seed: Some(42) },
|
||||||
|
HnswConfig { ef_construction: 80, ef_search: 30, seed: Some(42) },
|
||||||
|
HnswConfig { ef_construction: 80, ef_search: 60, seed: Some(42) },
|
||||||
|
HnswConfig { ef_construction: 160, ef_search: 30, seed: Some(42) },
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Config-space bounds. Rejects anything outside these before running a
|
||||||
|
/// trial — defends against an agent getting creative with absurd values.
|
||||||
|
fn sanitize_grid(grid: Vec<HnswConfig>) -> (Vec<HnswConfig>, Vec<String>) {
|
||||||
|
let mut ok = Vec::new();
|
||||||
|
let mut rejected = Vec::new();
|
||||||
|
for cfg in grid {
|
||||||
|
if !(10..=400).contains(&cfg.ef_construction) {
|
||||||
|
rejected.push(format!(
|
||||||
|
"ef_construction={} out of bounds [10,400]", cfg.ef_construction,
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !(10..=200).contains(&cfg.ef_search) {
|
||||||
|
rejected.push(format!(
|
||||||
|
"ef_search={} out of bounds [10,200]", cfg.ef_search,
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ok.push(cfg);
|
||||||
|
}
|
||||||
|
(ok, rejected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pareto winner: highest recall, then lowest p50 latency among
|
||||||
|
/// eligible trials (recall >= min_recall).
|
||||||
|
fn pick_winner(trials: &[Trial], min_recall: f32) -> Option<&Trial> {
|
||||||
|
trials
|
||||||
|
.iter()
|
||||||
|
.filter(|t| t.metrics.recall_at_k >= min_recall)
|
||||||
|
.min_by(|a, b| {
|
||||||
|
// Compare by (−recall, p50_latency) so minimum is the
|
||||||
|
// best trial: higher recall wins, then lower latency.
|
||||||
|
let a_key = (-a.metrics.recall_at_k, a.metrics.search_latency_p50_us);
|
||||||
|
let b_key = (-b.metrics.recall_at_k, b.metrics.search_latency_p50_us);
|
||||||
|
a_key.partial_cmp(&b_key).unwrap_or(std::cmp::Ordering::Equal)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the autotune loop. Synchronous — caller can wrap in tokio::spawn
|
||||||
|
/// to make it fire-and-forget. Uses the existing trial flow so every
|
||||||
|
/// autotune step lands in the trial journal.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub async fn run_autotune(
|
||||||
|
req: AutotuneRequest,
|
||||||
|
store: &Arc<dyn ObjectStore>,
|
||||||
|
catalog: &CatalogRegistry,
|
||||||
|
ai_client: &AiClient,
|
||||||
|
embedding_cache: &EmbeddingCache,
|
||||||
|
hnsw_store: &HnswStore,
|
||||||
|
index_registry: &IndexRegistry,
|
||||||
|
trial_journal: &TrialJournal,
|
||||||
|
promotion_registry: &PromotionRegistry,
|
||||||
|
_job_tracker: &JobTracker,
|
||||||
|
) -> Result<AutotuneResult, String> {
|
||||||
|
let t0 = std::time::Instant::now();
|
||||||
|
|
||||||
|
// Sanity: the index has to exist.
|
||||||
|
if index_registry.get(&req.index_name).await.is_none() {
|
||||||
|
return Err(format!("index not found: {}", req.index_name));
|
||||||
|
}
|
||||||
|
let _ = catalog; // reserved for future audit emission
|
||||||
|
|
||||||
|
// Load the harness once, compute ground truth once.
|
||||||
|
let mut harness_set = harness::EvalSet::load(store, &req.harness)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("load harness: {e}"))?;
|
||||||
|
|
||||||
|
let embeddings = embedding_cache
|
||||||
|
.get_or_load(&req.index_name)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("embeddings: {e}"))?;
|
||||||
|
|
||||||
|
if !harness_set.ground_truth_built {
|
||||||
|
harness::compute_ground_truth(&mut harness_set, &embeddings, ai_client)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("ground truth: {e}"))?;
|
||||||
|
harness_set.save(store).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
let (grid, rejected) = sanitize_grid(req.grid.clone().unwrap_or_else(default_grid));
|
||||||
|
if grid.is_empty() {
|
||||||
|
return Err("all configs rejected by bounds check".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"autotune '{}': {} trial configs, min_recall={}",
|
||||||
|
req.index_name, grid.len(), req.min_recall,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Run each trial through the existing pipeline so every run lands
|
||||||
|
// in the trial journal and contributes to the long-term tuning
|
||||||
|
// memory. This is the llms3.com-style "trials are data" pattern.
|
||||||
|
let mut run_trials: Vec<Trial> = Vec::with_capacity(grid.len());
|
||||||
|
for config in &grid {
|
||||||
|
let trial = run_single_trial(
|
||||||
|
&req.index_name,
|
||||||
|
&harness_set,
|
||||||
|
&embeddings,
|
||||||
|
config,
|
||||||
|
hnsw_store,
|
||||||
|
trial_journal,
|
||||||
|
ai_client,
|
||||||
|
).await;
|
||||||
|
match trial {
|
||||||
|
Ok(t) => run_trials.push(t),
|
||||||
|
Err(e) => tracing::warn!("autotune: trial ec={} es={} failed: {e}",
|
||||||
|
config.ef_construction, config.ef_search),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let trials_eligible = run_trials
|
||||||
|
.iter()
|
||||||
|
.filter(|t| t.metrics.recall_at_k >= req.min_recall)
|
||||||
|
.count();
|
||||||
|
|
||||||
|
let winner = pick_winner(&run_trials, req.min_recall);
|
||||||
|
let mut promoted = false;
|
||||||
|
|
||||||
|
let winner_report = if let Some(w) = winner {
|
||||||
|
let entry = PromotionEntry {
|
||||||
|
config: w.config.clone(),
|
||||||
|
trial_id: w.id.clone(),
|
||||||
|
promoted_at: Utc::now(),
|
||||||
|
promoted_by: "autotune".to_string(),
|
||||||
|
note: Some(format!(
|
||||||
|
"autotune winner: recall={:.3} p50={:.0}us",
|
||||||
|
w.metrics.recall_at_k, w.metrics.search_latency_p50_us,
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
if let Err(e) = promotion_registry.promote(&req.index_name, entry).await {
|
||||||
|
tracing::warn!("autotune: promote failed: {e}");
|
||||||
|
} else {
|
||||||
|
promoted = true;
|
||||||
|
}
|
||||||
|
Some(WinnerReport {
|
||||||
|
trial_id: w.id.clone(),
|
||||||
|
config: w.config.clone(),
|
||||||
|
recall_at_k: w.metrics.recall_at_k,
|
||||||
|
search_latency_p50_us: w.metrics.search_latency_p50_us,
|
||||||
|
build_time_secs: w.metrics.build_time_secs,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
"autotune '{}': no trial met min_recall={}; not promoting",
|
||||||
|
req.index_name, req.min_recall,
|
||||||
|
);
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(AutotuneResult {
|
||||||
|
index_name: req.index_name,
|
||||||
|
trials_run: run_trials.len(),
|
||||||
|
trials_eligible,
|
||||||
|
winner: winner_report,
|
||||||
|
promoted,
|
||||||
|
duration_secs: t0.elapsed().as_secs_f32(),
|
||||||
|
skipped: rejected,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run one trial and journal it. Returns the recorded Trial.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn run_single_trial(
|
||||||
|
index_name: &str,
|
||||||
|
harness_set: &harness::EvalSet,
|
||||||
|
embeddings: &Arc<Vec<crate::store::StoredEmbedding>>,
|
||||||
|
config: &HnswConfig,
|
||||||
|
hnsw_store: &HnswStore,
|
||||||
|
trial_journal: &TrialJournal,
|
||||||
|
_ai_client: &AiClient,
|
||||||
|
) -> Result<Trial, String> {
|
||||||
|
let trial_id = Trial::new_id();
|
||||||
|
let slot = format!("{}__{}", index_name, trial_id);
|
||||||
|
|
||||||
|
let build = hnsw_store
|
||||||
|
.build_index_with_config(&slot, (**embeddings).clone(), config)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let query_vectors: Vec<Vec<f32>> = harness_set
|
||||||
|
.queries
|
||||||
|
.iter()
|
||||||
|
.filter_map(|q| q.query_embedding.clone())
|
||||||
|
.collect();
|
||||||
|
let bench = hnsw_store
|
||||||
|
.bench_search(&slot, &query_vectors, harness_set.k)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut recalls = Vec::with_capacity(harness_set.queries.len());
|
||||||
|
for (q, hits) in harness_set.queries.iter().zip(bench.retrieved.iter()) {
|
||||||
|
if let Some(gt) = &q.ground_truth {
|
||||||
|
recalls.push(harness::recall_at_k(hits, gt, harness_set.k));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mean_recall = if recalls.is_empty() {
|
||||||
|
0.0
|
||||||
|
} else {
|
||||||
|
recalls.iter().sum::<f32>() / recalls.len() as f32
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut lats = bench.latencies_us.clone();
|
||||||
|
lats.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
|
||||||
|
let p = |pct: f32| -> f32 {
|
||||||
|
if lats.is_empty() { return 0.0; }
|
||||||
|
let idx = ((lats.len() as f32 - 1.0) * pct).round() as usize;
|
||||||
|
lats[idx.min(lats.len() - 1)]
|
||||||
|
};
|
||||||
|
|
||||||
|
let brute_us = if let Some(qv) = query_vectors.first() {
|
||||||
|
let t = std::time::Instant::now();
|
||||||
|
let _ = harness::brute_force_top_k(qv, embeddings, harness_set.k);
|
||||||
|
t.elapsed().as_micros() as f32
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
let dims = embeddings.first().map(|e| e.vector.len()).unwrap_or(0);
|
||||||
|
let memory_bytes =
|
||||||
|
(embeddings.len() * dims * std::mem::size_of::<f32>() + embeddings.len() * 128) as u64;
|
||||||
|
|
||||||
|
let trial = Trial {
|
||||||
|
id: trial_id.clone(),
|
||||||
|
index_name: index_name.to_string(),
|
||||||
|
eval_set: harness_set.name.clone(),
|
||||||
|
config: config.clone(),
|
||||||
|
metrics: TrialMetrics {
|
||||||
|
build_time_secs: build.build_time_secs,
|
||||||
|
search_latency_p50_us: p(0.50),
|
||||||
|
search_latency_p95_us: p(0.95),
|
||||||
|
search_latency_p99_us: p(0.99),
|
||||||
|
recall_at_k: mean_recall,
|
||||||
|
memory_bytes,
|
||||||
|
vectors: build.vectors,
|
||||||
|
eval_queries: harness_set.queries.len(),
|
||||||
|
brute_force_latency_us: brute_us,
|
||||||
|
},
|
||||||
|
created_at: Utc::now(),
|
||||||
|
note: Some("autotune".to_string()),
|
||||||
|
};
|
||||||
|
let _ = trial_journal.append(&trial).await;
|
||||||
|
let _ = hnsw_store.drop(&slot).await;
|
||||||
|
Ok(trial)
|
||||||
|
}
|
||||||
@ -1,9 +1,11 @@
|
|||||||
|
pub mod autotune;
|
||||||
pub mod chunker;
|
pub mod chunker;
|
||||||
pub mod embedding_cache;
|
pub mod embedding_cache;
|
||||||
pub mod harness;
|
pub mod harness;
|
||||||
pub mod hnsw;
|
pub mod hnsw;
|
||||||
pub mod index_registry;
|
pub mod index_registry;
|
||||||
pub mod jobs;
|
pub mod jobs;
|
||||||
|
pub mod promotion;
|
||||||
pub mod refresh;
|
pub mod refresh;
|
||||||
pub mod store;
|
pub mod store;
|
||||||
pub mod search;
|
pub mod search;
|
||||||
|
|||||||
187
crates/vectord/src/promotion.rs
Normal file
187
crates/vectord/src/promotion.rs
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
//! Phase 16: Promoted HNSW configs — the "active generation" pointer.
|
||||||
|
//!
|
||||||
|
//! An index's HNSW config used at build time normally defaults to the
|
||||||
|
//! system-wide default (`HnswConfig::default()`). An operator or the
|
||||||
|
//! autotune agent can *promote* a specific trial's config — subsequent
|
||||||
|
//! HNSW builds against that index use the promoted config instead.
|
||||||
|
//!
|
||||||
|
//! Every promotion is history-tracked so `rollback` can revert. The
|
||||||
|
//! history file lives at `primary://_hnsw_promotions/{index_name}.json`
|
||||||
|
//! and is small (< few KB) so we rewrite it on every promotion rather
|
||||||
|
//! than append-log.
|
||||||
|
//!
|
||||||
|
//! Not included here:
|
||||||
|
//! - Atomic graph rebuild on promote — promotion only updates the sticky
|
||||||
|
//! default. Next activation (or search that triggers lazy build) picks
|
||||||
|
//! up the new config. That's "zero-downtime swap after build" which is
|
||||||
|
//! what ADR-019 actually claimed; an instant-swap requires a
|
||||||
|
//! pre-built graph pool which we don't have yet.
|
||||||
|
//! - Agent loop — lives in `vectord::autotune`.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use storaged::ops;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
use crate::trial::HnswConfig;
|
||||||
|
|
||||||
|
const PROMOTION_PREFIX: &str = "_hnsw_promotions";
|
||||||
|
|
||||||
|
/// One promotion record. The `trial_id` is the origin of the config —
|
||||||
|
/// lets operators trace back "why was this config picked?" to the exact
|
||||||
|
/// trial in the trial journal.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct PromotionEntry {
|
||||||
|
pub config: HnswConfig,
|
||||||
|
pub trial_id: String,
|
||||||
|
pub promoted_at: DateTime<Utc>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub promoted_by: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub note: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serialized form of an index's promotion history.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct PromotionFile {
|
||||||
|
pub index_name: String,
|
||||||
|
pub current: Option<PromotionEntry>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub history: Vec<PromotionEntry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PromotionRegistry {
|
||||||
|
store: Arc<dyn ObjectStore>,
|
||||||
|
cache: Arc<RwLock<HashMap<String, PromotionFile>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PromotionRegistry {
|
||||||
|
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
|
||||||
|
Self {
|
||||||
|
store,
|
||||||
|
cache: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn key(index_name: &str) -> String {
|
||||||
|
// Sanitize for object-store safety.
|
||||||
|
let safe: String = index_name
|
||||||
|
.chars()
|
||||||
|
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' })
|
||||||
|
.collect();
|
||||||
|
format!("{PROMOTION_PREFIX}/{safe}.json")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load (and cache) the promotion file for an index.
|
||||||
|
pub async fn load(&self, index_name: &str) -> Result<PromotionFile, String> {
|
||||||
|
if let Some(cached) = self.cache.read().await.get(index_name) {
|
||||||
|
return Ok(cached.clone());
|
||||||
|
}
|
||||||
|
let key = Self::key(index_name);
|
||||||
|
let file = match ops::get(&self.store, &key).await {
|
||||||
|
Ok(bytes) => serde_json::from_slice::<PromotionFile>(&bytes)
|
||||||
|
.map_err(|e| format!("parse promotion file: {e}"))?,
|
||||||
|
Err(_) => PromotionFile {
|
||||||
|
index_name: index_name.to_string(),
|
||||||
|
current: None,
|
||||||
|
history: Vec::new(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
||||||
|
Ok(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Promote a config to the active slot. Pushes the current promotion
|
||||||
|
/// (if any) onto the history stack. Persists before returning — the
|
||||||
|
/// config is durable by the time this call completes.
|
||||||
|
pub async fn promote(
|
||||||
|
&self,
|
||||||
|
index_name: &str,
|
||||||
|
entry: PromotionEntry,
|
||||||
|
) -> Result<PromotionFile, String> {
|
||||||
|
let mut file = self.load(index_name).await?;
|
||||||
|
if let Some(prior) = file.current.take() {
|
||||||
|
file.history.push(prior);
|
||||||
|
// Cap history to something sensible so this file doesn't grow
|
||||||
|
// unbounded. 50 entries = 50 promotions — way more than any
|
||||||
|
// sane workflow needs.
|
||||||
|
const HISTORY_CAP: usize = 50;
|
||||||
|
if file.history.len() > HISTORY_CAP {
|
||||||
|
let drop = file.history.len() - HISTORY_CAP;
|
||||||
|
file.history.drain(0..drop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
file.current = Some(entry);
|
||||||
|
file.index_name = index_name.to_string();
|
||||||
|
|
||||||
|
let key = Self::key(index_name);
|
||||||
|
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
|
||||||
|
ops::put(&self.store, &key, json.into()).await?;
|
||||||
|
|
||||||
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
||||||
|
tracing::info!(
|
||||||
|
"promoted '{}' to config {:?} (trial={})",
|
||||||
|
index_name, file.current.as_ref().unwrap().config, file.current.as_ref().unwrap().trial_id,
|
||||||
|
);
|
||||||
|
Ok(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pop the latest promotion back onto the current slot (if any
|
||||||
|
/// history exists). If current is set but history is empty, the
|
||||||
|
/// current promotion is cleared — the index falls back to defaults.
|
||||||
|
pub async fn rollback(&self, index_name: &str) -> Result<PromotionFile, String> {
|
||||||
|
let mut file = self.load(index_name).await?;
|
||||||
|
match file.history.pop() {
|
||||||
|
Some(prev) => {
|
||||||
|
file.current = Some(prev);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if file.current.is_none() {
|
||||||
|
return Err(format!("no promotion to rollback for '{index_name}'"));
|
||||||
|
}
|
||||||
|
file.current = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let key = Self::key(index_name);
|
||||||
|
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
|
||||||
|
ops::put(&self.store, &key, json.into()).await?;
|
||||||
|
self.cache.write().await.insert(index_name.to_string(), file.clone());
|
||||||
|
tracing::info!("rolled back promotion for '{}'", index_name);
|
||||||
|
Ok(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the currently-promoted config (if any). Callers use this to
|
||||||
|
/// pick the right HnswConfig at build time.
|
||||||
|
pub async fn get_current(&self, index_name: &str) -> Option<PromotionEntry> {
|
||||||
|
self.load(index_name).await.ok().and_then(|f| f.current)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convenience: return the promoted config or the provided default.
|
||||||
|
pub async fn config_or(&self, index_name: &str, default: HnswConfig) -> HnswConfig {
|
||||||
|
match self.get_current(index_name).await {
|
||||||
|
Some(entry) => entry.config,
|
||||||
|
None => default,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List every index that has a promotion recorded (for operator UI).
|
||||||
|
pub async fn list_all(&self) -> Result<Vec<PromotionFile>, String> {
|
||||||
|
let keys = ops::list(&self.store, Some(&format!("{PROMOTION_PREFIX}/"))).await.unwrap_or_default();
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for key in keys {
|
||||||
|
if !key.ends_with(".json") { continue; }
|
||||||
|
let bytes = match ops::get(&self.store, &key).await {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
if let Ok(f) = serde_json::from_slice::<PromotionFile>(&bytes) {
|
||||||
|
out.push(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -11,7 +11,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use aibridge::client::{AiClient, EmbedRequest};
|
use aibridge::client::{AiClient, EmbedRequest};
|
||||||
use catalogd::registry::Registry as CatalogRegistry;
|
use catalogd::registry::Registry as CatalogRegistry;
|
||||||
use crate::{chunker, embedding_cache, harness, hnsw, index_registry, jobs, rag, refresh, search, store, supervisor, trial};
|
use crate::{autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, promotion, rag, refresh, search, store, supervisor, trial};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct VectorState {
|
pub struct VectorState {
|
||||||
@ -25,6 +25,8 @@ pub struct VectorState {
|
|||||||
/// Catalog registry — needed by the Phase C refresh path to mark/clear
|
/// Catalog registry — needed by the Phase C refresh path to mark/clear
|
||||||
/// staleness and look up dataset manifests.
|
/// staleness and look up dataset manifests.
|
||||||
pub catalog: CatalogRegistry,
|
pub catalog: CatalogRegistry,
|
||||||
|
/// Phase 16: promoted HNSW configs. Activation + autotune read/write here.
|
||||||
|
pub promotion_registry: promotion::PromotionRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn router(state: VectorState) -> Router {
|
pub fn router(state: VectorState) -> Router {
|
||||||
@ -58,6 +60,11 @@ pub fn router(state: VectorState) -> Router {
|
|||||||
// model's bound data. First search after activate is warm.
|
// model's bound data. First search after activate is warm.
|
||||||
.route("/profile/{id}/activate", post(activate_profile))
|
.route("/profile/{id}/activate", post(activate_profile))
|
||||||
.route("/profile/{id}/search", post(profile_scoped_search))
|
.route("/profile/{id}/search", post(profile_scoped_search))
|
||||||
|
// Phase 16: promotion + autotune
|
||||||
|
.route("/hnsw/promote/{index}/{trial_id}", post(promote_trial))
|
||||||
|
.route("/hnsw/rollback/{index}", post(rollback_promotion))
|
||||||
|
.route("/hnsw/promoted/{index}", get(get_promoted))
|
||||||
|
.route("/hnsw/autotune", post(run_autotune_endpoint))
|
||||||
.with_state(state)
|
.with_state(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -806,12 +813,19 @@ async fn activate_profile(
|
|||||||
};
|
};
|
||||||
total_vectors += embeddings.len();
|
total_vectors += embeddings.len();
|
||||||
|
|
||||||
// Build HNSW with the profile's config.
|
// Build HNSW with the index's PROMOTED config if one exists
|
||||||
let cfg = trial::HnswConfig {
|
// (Phase 16 hot-swap), otherwise fall back to the profile's
|
||||||
|
// declared defaults. This means autotune's winner is picked
|
||||||
|
// up on the next activation automatically.
|
||||||
|
let profile_default = trial::HnswConfig {
|
||||||
ef_construction: profile.hnsw_config.ef_construction,
|
ef_construction: profile.hnsw_config.ef_construction,
|
||||||
ef_search: profile.hnsw_config.ef_search,
|
ef_search: profile.hnsw_config.ef_search,
|
||||||
seed: profile.hnsw_config.seed,
|
seed: profile.hnsw_config.seed,
|
||||||
};
|
};
|
||||||
|
let cfg = state
|
||||||
|
.promotion_registry
|
||||||
|
.config_or(&meta.index_name, profile_default)
|
||||||
|
.await;
|
||||||
let build_t = std::time::Instant::now();
|
let build_t = std::time::Instant::now();
|
||||||
match state
|
match state
|
||||||
.hnsw_store
|
.hnsw_store
|
||||||
@ -919,3 +933,83 @@ async fn profile_scoped_search(
|
|||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Phase 16: Promotion + autotune ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct PromoteQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
promoted_by: String,
|
||||||
|
#[serde(default)]
|
||||||
|
note: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn promote_trial(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Path((index_name, trial_id)): Path<(String, String)>,
|
||||||
|
Query(q): Query<PromoteQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
// Pull the trial from the journal to get its config.
|
||||||
|
let trials = state
|
||||||
|
.trial_journal
|
||||||
|
.list(&index_name)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||||
|
let trial = trials
|
||||||
|
.iter()
|
||||||
|
.find(|t| t.id == trial_id)
|
||||||
|
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("trial not found: {trial_id}")))?;
|
||||||
|
|
||||||
|
let entry = promotion::PromotionEntry {
|
||||||
|
config: trial.config.clone(),
|
||||||
|
trial_id: trial.id.clone(),
|
||||||
|
promoted_at: chrono::Utc::now(),
|
||||||
|
promoted_by: q.promoted_by,
|
||||||
|
note: q.note,
|
||||||
|
};
|
||||||
|
match state.promotion_registry.promote(&index_name, entry).await {
|
||||||
|
Ok(file) => Ok(Json(file)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn rollback_promotion(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Path(index_name): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match state.promotion_registry.rollback(&index_name).await {
|
||||||
|
Ok(file) => Ok(Json(file)),
|
||||||
|
Err(e) => Err((StatusCode::NOT_FOUND, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_promoted(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Path(index_name): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match state.promotion_registry.load(&index_name).await {
|
||||||
|
Ok(file) => Ok(Json(file)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_autotune_endpoint(
|
||||||
|
State(state): State<VectorState>,
|
||||||
|
Json(req): Json<autotune::AutotuneRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match autotune::run_autotune(
|
||||||
|
req,
|
||||||
|
&state.store,
|
||||||
|
&state.catalog,
|
||||||
|
&state.ai_client,
|
||||||
|
&state.embedding_cache,
|
||||||
|
&state.hnsw_store,
|
||||||
|
&state.index_registry,
|
||||||
|
&state.trial_journal,
|
||||||
|
&state.promotion_registry,
|
||||||
|
&state.job_tracker,
|
||||||
|
).await {
|
||||||
|
Ok(result) => Ok(Json(result)),
|
||||||
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -154,6 +154,13 @@
|
|||||||
- `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack
|
- `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack
|
||||||
- 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard
|
- 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard
|
||||||
- Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling.
|
- Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling.
|
||||||
|
- [x] Phase 16: Hot-swap generations + autotune agent — 2026-04-16
|
||||||
|
- `vectord::promotion::PromotionRegistry` — per-index current config + history at `_hnsw_promotions/{index}.json`, cap 50 history entries
|
||||||
|
- Endpoints: `POST /vectors/hnsw/promote/{index}/{trial_id}`, `POST /vectors/hnsw/rollback/{index}`, `GET /vectors/hnsw/promoted/{index}`
|
||||||
|
- `vectord::autotune::run_autotune` — grid of trials (configurable or default 5 configs), Pareto winner selection (max recall, then min p50), min_recall safety gate (default 0.9), config bounds (ec ∈ [10,400], es ∈ [10,200])
|
||||||
|
- `POST /vectors/hnsw/autotune` — runs the full loop synchronously, journals every trial, auto-promotes winner
|
||||||
|
- `activate_profile` uses `promotion_registry.config_or(..., profile_default)` so newly-promoted configs flow automatically into next activation
|
||||||
|
- End-to-end: autogen harness for threat_intel_v1 (10 queries), autotune ran 5 trials (all recall=1.00, p50 64-68us), promoted ec=20 es=30 at recall=1.0 p50=64us as winner. Manual promote of ec=80 es=30 pushed autotune pick onto history. Rollback restored autotune winner. Second rollback cleared to None. Re-promote + restart verified persistence. Activation after promotion logged "building HNSW ef_construction=80 ef_search=30 seed=42" — config flowed through correctly.
|
||||||
- [x] Phase 17: Model profiles + scoped search — 2026-04-16
|
- [x] Phase 17: Model profiles + scoped search — 2026-04-16
|
||||||
- `shared::types::ModelProfile` — { id, ollama_name, description, bound_datasets, hnsw_config, embed_model, created_at, created_by }
|
- `shared::types::ModelProfile` — { id, ollama_name, description, bound_datasets, hnsw_config, embed_model, created_at, created_by }
|
||||||
- `shared::types::ProfileHnswConfig` — mirror of vectord's HnswConfig to avoid cross-crate dep cycle (defaults ec=80 es=30 matching Phase 15 winner)
|
- `shared::types::ProfileHnswConfig` — mirror of vectord's HnswConfig to avoid cross-crate dep cycle (defaults ec=80 es=30 matching Phase 15 winner)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user