root dbe00d018f Federation foundation + HNSW trial system + Postgres streaming + PRD reframe
Four shipped features and a PRD realignment, all measured end-to-end:

HNSW trial system (Phase 15 horizon item → complete)
- vectord: EmbeddingCache, harness (eval sets + brute-force ground truth),
  TrialJournal, parameterized HnswConfig on build_index_with_config
- /vectors/hnsw/trial, /hnsw/trials/{idx}, /hnsw/trials/{idx}/best,
  /hnsw/evals/{name}/autogen, /hnsw/cache/stats
- Measured on resumes_100k_v2 (100K × 768d): brute-force 44ms -> HNSW 873us
  at 100% recall@10. ec=80 es=30 locked as HnswConfig::default()
- Lower ec values trade recall for build time: 20/30 = 0.96 recall in 8s,
  80/30 = 1.00 recall in 230s

Catalog manifest repair
- catalogd: resync_from_parquet reads parquet footers to restore row_count
  and columns on drifted manifests
- POST /catalog/datasets/{name}/resync + POST /catalog/resync-missing
- All 7 staffing tables recovered to PRD-matching 2,469,278 rows

Federation foundation (ADR-017)
- shared::secrets: SecretsProvider trait + FileSecretsProvider (reads
  /etc/lakehouse/secrets.toml, enforces 0600 perms)
- storaged::registry::BucketRegistry — multi-bucket resolution with
  rescue_bucket read fallback and reachability probing
- storaged::error_journal — bucket op failures visible in one HTTP call
- storaged::append_log — write-once batched append pattern (fixes the RMW
  anti-pattern llms3.com calls out; errors and trial journals both use it)
- /storage/buckets, /storage/errors, /storage/bucket-health,
  /storage/errors/{flush,compact}
- Bucket-aware I/O at /storage/buckets/{bucket}/objects/{*key} with
  X-Lakehouse-Rescue-Used observability headers on fallback

Postgres streaming ingest
- ingestd::pg_stream: DSN parser, batched ORDER BY + LIMIT/OFFSET pagination
  into ArrowWriter, lineage redacts password
- POST /ingest/db — verified against live knowledge_base.team_runs
  (586 rows × 13 cols, 6 batches, 196ms end-to-end)

PRD realignment (2026-04-16)
- Dual use case: staffing analytics + local LLM knowledge substrate
- Removed "multi-tenancy (single-owner system)" from non-goals
- Added invariants 8-11: indexes hot-swappable, per-reader profiles,
  trials-as-data, operational failures findable in one HTTP call
- New phases 16 (hot-swap generations), 17 (model profiles + dataset
  bindings), 18 (Lance vs Parquet+sidecar evaluation)
- Known ceilings table documents the 5M vector wall and escape hatches
- ADR-017 (federation), ADR-018 (append-log pattern) added
- EXECUTION_PLAN.md sequences phases B-E with success gates and
  decision rules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 01:50:05 -05:00

230 lines
7.3 KiB
Rust

/// HNSW (Hierarchical Navigable Small World) index for fast approximate nearest neighbor search.
/// Wraps instant-distance to provide <50ms search over 100K+ vectors.
/// Falls back to brute-force for small datasets.
use instant_distance::{Builder, HnswMap, Search, Point};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::store::StoredEmbedding;
use crate::trial::HnswConfig;
/// A vector point for HNSW — wraps f32 slice with cosine distance.
#[derive(Clone)]
struct VecPoint(Vec<f32>);
impl Point for VecPoint {
fn distance(&self, other: &Self) -> f32 {
// Cosine distance = 1 - cosine_similarity
let dot: f32 = self.0.iter().zip(other.0.iter()).map(|(a, b)| a * b).sum();
let norm_a: f32 = self.0.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = other.0.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
return 1.0;
}
1.0 - (dot / (norm_a * norm_b))
}
}
/// HNSW search result.
#[derive(Debug, Clone, serde::Serialize)]
pub struct HnswResult {
pub source: String,
pub doc_id: String,
pub chunk_idx: u32,
pub chunk_text: String,
pub score: f32, // cosine similarity (1.0 = identical)
}
/// An HNSW index built from stored embeddings.
pub struct HnswIndex {
map: HnswMap<VecPoint, usize>, // value is index into metadata vec
metadata: Vec<EmbeddingMeta>,
}
#[derive(Clone)]
struct EmbeddingMeta {
source: String,
doc_id: String,
chunk_idx: u32,
chunk_text: String,
}
/// Shared, thread-safe index store.
#[derive(Clone)]
pub struct HnswStore {
indexes: Arc<RwLock<std::collections::HashMap<String, Arc<HnswIndex>>>>,
}
impl HnswStore {
pub fn new() -> Self {
Self {
indexes: Arc::new(RwLock::new(std::collections::HashMap::new())),
}
}
/// Build an HNSW index from stored embeddings with default config.
pub async fn build_index(
&self,
index_name: &str,
embeddings: Vec<StoredEmbedding>,
) -> Result<BuildStats, String> {
self.build_index_with_config(index_name, embeddings, &HnswConfig::default()).await
}
/// Build an HNSW index from stored embeddings with explicit config.
/// Used by the trial system — each trial calls this with different params.
pub async fn build_index_with_config(
&self,
index_name: &str,
embeddings: Vec<StoredEmbedding>,
config: &HnswConfig,
) -> Result<BuildStats, String> {
let n = embeddings.len();
if n == 0 {
return Err("no embeddings to index".into());
}
tracing::info!(
"building HNSW '{}': {} vectors, ef_construction={} ef_search={} seed={:?}",
index_name, n, config.ef_construction, config.ef_search, config.seed,
);
let start = std::time::Instant::now();
// Separate points and metadata
let mut points = Vec::with_capacity(n);
let mut metadata = Vec::with_capacity(n);
let mut values = Vec::with_capacity(n);
for (i, emb) in embeddings.into_iter().enumerate() {
points.push(VecPoint(emb.vector));
metadata.push(EmbeddingMeta {
source: emb.source,
doc_id: emb.doc_id,
chunk_idx: emb.chunk_idx,
chunk_text: emb.chunk_text,
});
values.push(i);
}
// Build HNSW — the expensive part
let mut builder = Builder::default()
.ef_construction(config.ef_construction)
.ef_search(config.ef_search);
if let Some(seed) = config.seed {
builder = builder.seed(seed);
}
let map = builder.build(points, values);
let build_time = start.elapsed().as_secs_f32();
tracing::info!("HNSW '{}' built: {} vectors in {:.1}s", index_name, n, build_time);
let index = Arc::new(HnswIndex { map, metadata });
self.indexes.write().await.insert(index_name.to_string(), index);
Ok(BuildStats {
index_name: index_name.to_string(),
vectors: n,
build_time_secs: build_time,
})
}
/// Run a batch of search queries and return raw per-query latencies in microseconds.
/// Also returns the retrieved doc_ids per query (for recall calculation).
pub async fn bench_search(
&self,
index_name: &str,
query_vectors: &[Vec<f32>],
top_k: usize,
) -> Result<BenchResult, String> {
let indexes = self.indexes.read().await;
let index = indexes
.get(index_name)
.ok_or_else(|| format!("HNSW index not found: {index_name}"))?
.clone();
drop(indexes);
let mut latencies_us = Vec::with_capacity(query_vectors.len());
let mut retrieved: Vec<Vec<String>> = Vec::with_capacity(query_vectors.len());
for qv in query_vectors {
let query_point = VecPoint(qv.clone());
let t0 = std::time::Instant::now();
let mut search = Search::default();
let results = index.map.search(&query_point, &mut search);
let ids: Vec<String> = results
.take(top_k)
.map(|item| {
let meta_idx = *item.value;
index.metadata[meta_idx].doc_id.clone()
})
.collect();
latencies_us.push(t0.elapsed().as_micros() as f32);
retrieved.push(ids);
}
Ok(BenchResult { latencies_us, retrieved })
}
/// Search an HNSW index. Returns approximate nearest neighbors.
pub async fn search(
&self,
index_name: &str,
query: &[f32],
top_k: usize,
) -> Result<Vec<HnswResult>, String> {
let indexes = self.indexes.read().await;
let index = indexes.get(index_name)
.ok_or_else(|| format!("HNSW index not found: {index_name}"))?;
let query_point = VecPoint(query.to_vec());
let mut search = Search::default();
let results = index.map.search(&query_point, &mut search);
let mut out = Vec::with_capacity(top_k);
for item in results.take(top_k) {
let meta_idx = *item.value;
let meta = &index.metadata[meta_idx];
// Convert distance back to similarity
let similarity = 1.0 - item.distance;
out.push(HnswResult {
source: meta.source.clone(),
doc_id: meta.doc_id.clone(),
chunk_idx: meta.chunk_idx,
chunk_text: meta.chunk_text.clone(),
score: similarity,
});
}
Ok(out)
}
/// Check if an index exists.
pub async fn has_index(&self, name: &str) -> bool {
self.indexes.read().await.contains_key(name)
}
/// List all loaded indexes.
pub async fn list(&self) -> Vec<String> {
self.indexes.read().await.keys().cloned().collect()
}
/// Drop an index from memory.
pub async fn drop(&self, name: &str) -> bool {
self.indexes.write().await.remove(name).is_some()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct BuildStats {
pub index_name: String,
pub vectors: usize,
pub build_time_secs: f32,
}
#[derive(Debug, Clone)]
pub struct BenchResult {
pub latencies_us: Vec<f32>,
pub retrieved: Vec<Vec<String>>,
}