Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.
## Phase 16.2 / 16.5 — autotune agent + ingest triggers
`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.
The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).
`[agent]` config section in lakehouse.toml; opt-in via enabled=true.
## Federation Layer 2 — runtime bucket lifecycle + per-index scoping
`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.
`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.
`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.
EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.
## Phase 17 — VRAM-aware two-profile gate
Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.
`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.
Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.
## MySQL streaming connector
`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.
POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).
Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.
## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)
`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.
`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).
Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence
Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
~35ms full-file scan, slower than the bench's 311us positional
take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly
## Known follow-ups not in this milestone
- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
{id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
(malformed `{prefix}/` parses as code fence) — pre-existing bug,
left for a focused fix
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
181 lines
6.1 KiB
Rust
181 lines
6.1 KiB
Rust
use reqwest::Client;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::time::Duration;
|
|
|
|
/// HTTP client for the Python AI sidecar.
|
|
#[derive(Clone)]
|
|
pub struct AiClient {
|
|
client: Client,
|
|
base_url: String,
|
|
}
|
|
|
|
// -- Request/Response types --
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct EmbedRequest {
|
|
pub texts: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct EmbedResponse {
|
|
pub embeddings: Vec<Vec<f64>>,
|
|
pub model: String,
|
|
pub dimensions: usize,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct GenerateRequest {
|
|
pub prompt: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub system: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub temperature: Option<f64>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub max_tokens: Option<u32>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct GenerateResponse {
|
|
pub text: String,
|
|
pub model: String,
|
|
pub tokens_evaluated: Option<u64>,
|
|
pub tokens_generated: Option<u64>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct RerankRequest {
|
|
pub query: String,
|
|
pub documents: Vec<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub model: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub top_k: Option<usize>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct ScoredDocument {
|
|
pub index: usize,
|
|
pub text: String,
|
|
pub score: f64,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct RerankResponse {
|
|
pub results: Vec<ScoredDocument>,
|
|
pub model: String,
|
|
}
|
|
|
|
impl AiClient {
|
|
pub fn new(base_url: &str) -> Self {
|
|
let client = Client::builder()
|
|
.timeout(Duration::from_secs(120))
|
|
.build()
|
|
.expect("failed to build HTTP client");
|
|
Self {
|
|
client,
|
|
base_url: base_url.trim_end_matches('/').to_string(),
|
|
}
|
|
}
|
|
|
|
pub async fn health(&self) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.get(format!("{}/health", self.base_url))
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("sidecar unreachable: {e}"))?;
|
|
resp.json().await.map_err(|e| format!("invalid response: {e}"))
|
|
}
|
|
|
|
pub async fn embed(&self, req: EmbedRequest) -> Result<EmbedResponse, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/embed", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("embed request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("embed error ({}): {text}", text.len()));
|
|
}
|
|
resp.json().await.map_err(|e| format!("embed parse error: {e}"))
|
|
}
|
|
|
|
pub async fn generate(&self, req: GenerateRequest) -> Result<GenerateResponse, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/generate", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("generate request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("generate error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("generate parse error: {e}"))
|
|
}
|
|
|
|
pub async fn rerank(&self, req: RerankRequest) -> Result<RerankResponse, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/rerank", self.base_url))
|
|
.json(&req)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("rerank request failed: {e}"))?;
|
|
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("rerank error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("rerank parse error: {e}"))
|
|
}
|
|
|
|
/// Force Ollama to unload the named model from VRAM (keep_alive=0).
|
|
/// Used for predictable profile swaps — without this, Ollama holds a
|
|
/// model for its configured TTL (default 5min) and the previous
|
|
/// profile's model can linger in VRAM next to the new one.
|
|
pub async fn unload_model(&self, model: &str) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/admin/unload", self.base_url))
|
|
.json(&serde_json::json!({ "model": model }))
|
|
.send().await
|
|
.map_err(|e| format!("unload request failed: {e}"))?;
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("unload error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("unload parse error: {e}"))
|
|
}
|
|
|
|
/// Ask Ollama to load the named model into VRAM proactively. Makes
|
|
/// the first real request after profile activation fast (no cold-load
|
|
/// latency).
|
|
pub async fn preload_model(&self, model: &str) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.post(format!("{}/admin/preload", self.base_url))
|
|
.json(&serde_json::json!({ "model": model }))
|
|
.send().await
|
|
.map_err(|e| format!("preload request failed: {e}"))?;
|
|
if !resp.status().is_success() {
|
|
let text = resp.text().await.unwrap_or_default();
|
|
return Err(format!("preload error: {text}"));
|
|
}
|
|
resp.json().await.map_err(|e| format!("preload parse error: {e}"))
|
|
}
|
|
|
|
/// GPU + loaded-model snapshot from the sidecar. Combines nvidia-smi
|
|
/// output (if available) with Ollama's /api/ps.
|
|
pub async fn vram_snapshot(&self) -> Result<serde_json::Value, String> {
|
|
let resp = self.client
|
|
.get(format!("{}/admin/vram", self.base_url))
|
|
.send().await
|
|
.map_err(|e| format!("vram request failed: {e}"))?;
|
|
resp.json().await.map_err(|e| format!("vram parse error: {e}"))
|
|
}
|
|
}
|