Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.
## Phase 16.2 / 16.5 — autotune agent + ingest triggers
`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.
The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).
`[agent]` config section in lakehouse.toml; opt-in via enabled=true.
## Federation Layer 2 — runtime bucket lifecycle + per-index scoping
`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.
`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.
`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.
EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.
## Phase 17 — VRAM-aware two-profile gate
Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.
`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.
Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.
## MySQL streaming connector
`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.
POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).
Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.
## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)
`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.
`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).
Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence
Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
~35ms full-file scan, slower than the bench's 311us positional
take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly
## Known follow-ups not in this milestone
- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
{id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
(malformed `{prefix}/` parses as code fence) — pre-existing bug,
left for a focused fix
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
417 lines
12 KiB
Rust
417 lines
12 KiB
Rust
use axum::{
|
|
Json, Router,
|
|
extract::{Path, State},
|
|
http::StatusCode,
|
|
response::IntoResponse,
|
|
routing::{get, post},
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use shared::types::{DatasetId, ObjectRef, SchemaFingerprint};
|
|
use uuid::Uuid;
|
|
|
|
use crate::registry::Registry;
|
|
|
|
pub fn router(registry: Registry) -> Router {
|
|
Router::new()
|
|
.route("/health", get(health))
|
|
.route("/datasets", post(create_dataset))
|
|
.route("/datasets", get(list_datasets))
|
|
.route("/datasets/{id}", get(get_dataset))
|
|
.route("/datasets/by-name/{name}", get(get_dataset_by_name))
|
|
.route("/datasets/by-name/{name}/metadata", post(update_metadata))
|
|
.route("/datasets/by-name/{name}/resync", post(resync_dataset))
|
|
.route("/resync-missing", post(resync_all_missing))
|
|
.route("/migrate-buckets", post(migrate_buckets))
|
|
// Phase D: AI-safe views
|
|
.route("/views", post(create_view).get(list_views))
|
|
.route("/views/{name}", get(get_view).delete(delete_view))
|
|
// Phase E: soft-delete tombstones
|
|
.route("/datasets/by-name/{name}/tombstone", post(tombstone_rows).get(list_tombstones))
|
|
// Phase 17: model profiles
|
|
.route("/profiles", post(create_profile).get(list_profiles))
|
|
.route("/profiles/{id}", get(get_profile).delete(delete_profile))
|
|
.with_state(registry)
|
|
}
|
|
|
|
async fn health() -> &'static str {
|
|
"catalogd ok"
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct CreateDatasetRequest {
|
|
name: String,
|
|
schema_fingerprint: String,
|
|
objects: Vec<ObjectRefRequest>,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct ObjectRefRequest {
|
|
bucket: String,
|
|
key: String,
|
|
size_bytes: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct DatasetResponse {
|
|
id: String,
|
|
name: String,
|
|
schema_fingerprint: String,
|
|
objects: Vec<ObjectRefResponse>,
|
|
created_at: String,
|
|
updated_at: String,
|
|
// Rich metadata
|
|
description: String,
|
|
owner: String,
|
|
sensitivity: Option<shared::types::Sensitivity>,
|
|
columns: Vec<shared::types::ColumnMeta>,
|
|
lineage: Option<shared::types::Lineage>,
|
|
freshness: Option<shared::types::FreshnessContract>,
|
|
tags: Vec<String>,
|
|
row_count: Option<u64>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ObjectRefResponse {
|
|
bucket: String,
|
|
key: String,
|
|
size_bytes: u64,
|
|
created_at: String,
|
|
}
|
|
|
|
impl From<&shared::types::DatasetManifest> for DatasetResponse {
|
|
fn from(m: &shared::types::DatasetManifest) -> Self {
|
|
Self {
|
|
id: m.id.to_string(),
|
|
name: m.name.clone(),
|
|
schema_fingerprint: m.schema_fingerprint.0.clone(),
|
|
objects: m.objects.iter().map(|o| ObjectRefResponse {
|
|
bucket: o.bucket.clone(),
|
|
key: o.key.clone(),
|
|
size_bytes: o.size_bytes,
|
|
created_at: o.created_at.to_rfc3339(),
|
|
}).collect(),
|
|
created_at: m.created_at.to_rfc3339(),
|
|
updated_at: m.updated_at.to_rfc3339(),
|
|
description: m.description.clone(),
|
|
owner: m.owner.clone(),
|
|
sensitivity: m.sensitivity.clone(),
|
|
columns: m.columns.clone(),
|
|
lineage: m.lineage.clone(),
|
|
freshness: m.freshness.clone(),
|
|
tags: m.tags.clone(),
|
|
row_count: m.row_count,
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn create_dataset(
|
|
State(registry): State<Registry>,
|
|
Json(req): Json<CreateDatasetRequest>,
|
|
) -> impl IntoResponse {
|
|
let now = chrono::Utc::now();
|
|
let objects: Vec<ObjectRef> = req.objects.into_iter().map(|o| ObjectRef {
|
|
bucket: o.bucket,
|
|
key: o.key,
|
|
size_bytes: o.size_bytes,
|
|
created_at: now,
|
|
}).collect();
|
|
|
|
match registry.register(req.name, SchemaFingerprint(req.schema_fingerprint), objects).await {
|
|
Ok(manifest) => {
|
|
let resp = DatasetResponse::from(&manifest);
|
|
Ok((StatusCode::CREATED, Json(resp)))
|
|
}
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
async fn list_datasets(State(registry): State<Registry>) -> impl IntoResponse {
|
|
let datasets = registry.list().await;
|
|
let resp: Vec<DatasetResponse> = datasets.iter().map(DatasetResponse::from).collect();
|
|
Json(resp)
|
|
}
|
|
|
|
async fn get_dataset(
|
|
State(registry): State<Registry>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
let uuid = Uuid::parse_str(&id).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
|
|
let dataset_id = DatasetId(uuid);
|
|
match registry.get(&dataset_id).await {
|
|
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
|
|
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {id}"))),
|
|
}
|
|
}
|
|
|
|
async fn get_dataset_by_name(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.get_by_name(&name).await {
|
|
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
|
|
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {name}"))),
|
|
}
|
|
}
|
|
|
|
async fn update_metadata(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
Json(updates): Json<crate::registry::MetadataUpdate>,
|
|
) -> impl IntoResponse {
|
|
match registry.update_metadata(&name, updates).await {
|
|
Ok(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
/// Re-read parquet footers for a single dataset and repopulate row_count
|
|
/// and columns from reality. Useful for repairing manifests whose metadata
|
|
/// was lost or never backfilled.
|
|
async fn resync_dataset(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.resync_from_parquet(&name).await {
|
|
Ok(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ResyncAllResponse {
|
|
succeeded: Vec<ResyncOk>,
|
|
failed: Vec<ResyncErr>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ResyncOk {
|
|
name: String,
|
|
row_count: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ResyncErr {
|
|
name: String,
|
|
error: String,
|
|
}
|
|
|
|
/// Resync every dataset that currently has null row_count or empty columns.
|
|
async fn resync_all_missing(State(registry): State<Registry>) -> impl IntoResponse {
|
|
let (ok, err) = registry.resync_missing().await;
|
|
Json(ResyncAllResponse {
|
|
succeeded: ok.into_iter().map(|(name, row_count)| ResyncOk { name, row_count }).collect(),
|
|
failed: err.into_iter().map(|(name, error)| ResyncErr { name, error }).collect(),
|
|
})
|
|
}
|
|
|
|
/// Federation layer 2 one-shot: normalize every ObjectRef.bucket field
|
|
/// to the canonical "primary" value. Idempotent — re-running once
|
|
/// everything is canonical is a safe no-op.
|
|
async fn migrate_buckets(State(registry): State<Registry>) -> impl IntoResponse {
|
|
match registry.migrate_buckets_to_primary().await {
|
|
Ok(report) => Ok(Json(report)),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- Phase D: AI-safe views ---
|
|
|
|
#[derive(Deserialize)]
|
|
struct CreateViewRequest {
|
|
name: String,
|
|
base_dataset: String,
|
|
columns: Vec<String>,
|
|
#[serde(default)]
|
|
row_filter: Option<String>,
|
|
#[serde(default)]
|
|
column_redactions: std::collections::HashMap<String, shared::types::Redaction>,
|
|
#[serde(default)]
|
|
description: String,
|
|
#[serde(default)]
|
|
created_by: String,
|
|
}
|
|
|
|
async fn create_view(
|
|
State(registry): State<Registry>,
|
|
Json(req): Json<CreateViewRequest>,
|
|
) -> impl IntoResponse {
|
|
let view = shared::types::AiView {
|
|
name: req.name,
|
|
base_dataset: req.base_dataset,
|
|
columns: req.columns,
|
|
row_filter: req.row_filter,
|
|
column_redactions: req.column_redactions,
|
|
created_at: chrono::Utc::now(),
|
|
created_by: req.created_by,
|
|
description: req.description,
|
|
};
|
|
match registry.put_view(view).await {
|
|
Ok(v) => Ok((StatusCode::CREATED, Json(v))),
|
|
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
|
|
}
|
|
}
|
|
|
|
async fn list_views(State(registry): State<Registry>) -> impl IntoResponse {
|
|
Json(registry.list_views().await)
|
|
}
|
|
|
|
async fn get_view(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.get_view(&name).await {
|
|
Some(v) => Ok(Json(v)),
|
|
None => Err((StatusCode::NOT_FOUND, format!("view not found: {name}"))),
|
|
}
|
|
}
|
|
|
|
async fn delete_view(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.delete_view(&name).await {
|
|
Ok(()) => Ok(StatusCode::NO_CONTENT),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- Phase E: soft-delete tombstones ---
|
|
|
|
#[derive(Deserialize)]
|
|
struct TombstoneRequest {
|
|
row_key_column: String,
|
|
row_key_values: Vec<String>,
|
|
#[serde(default)]
|
|
actor: String,
|
|
#[serde(default)]
|
|
reason: String,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct TombstoneResponse {
|
|
dataset: String,
|
|
row_key_column: String,
|
|
rows_tombstoned: usize,
|
|
failures: Vec<String>,
|
|
}
|
|
|
|
async fn tombstone_rows(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
Json(req): Json<TombstoneRequest>,
|
|
) -> impl IntoResponse {
|
|
if req.row_key_values.is_empty() {
|
|
return Err((StatusCode::BAD_REQUEST, "row_key_values is empty".to_string()));
|
|
}
|
|
|
|
let mut ok = 0;
|
|
let mut failures = Vec::new();
|
|
for value in &req.row_key_values {
|
|
match registry
|
|
.add_tombstone(&name, &req.row_key_column, value, &req.actor, &req.reason)
|
|
.await
|
|
{
|
|
Ok(_) => ok += 1,
|
|
Err(e) => failures.push(format!("{value}: {e}")),
|
|
}
|
|
}
|
|
|
|
let status = if ok > 0 && failures.is_empty() {
|
|
StatusCode::CREATED
|
|
} else if ok > 0 {
|
|
StatusCode::MULTI_STATUS
|
|
} else {
|
|
StatusCode::BAD_REQUEST
|
|
};
|
|
|
|
Ok((status, Json(TombstoneResponse {
|
|
dataset: name,
|
|
row_key_column: req.row_key_column,
|
|
rows_tombstoned: ok,
|
|
failures,
|
|
})))
|
|
}
|
|
|
|
async fn list_tombstones(
|
|
State(registry): State<Registry>,
|
|
Path(name): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.list_tombstones(&name).await {
|
|
Ok(ts) => Ok(Json(ts)),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|
|
|
|
// --- Phase 17: Model profiles ---
|
|
|
|
#[derive(Deserialize)]
|
|
struct CreateProfileRequest {
|
|
id: String,
|
|
ollama_name: String,
|
|
#[serde(default)]
|
|
description: String,
|
|
bound_datasets: Vec<String>,
|
|
#[serde(default)]
|
|
hnsw_config: shared::types::ProfileHnswConfig,
|
|
#[serde(default = "default_embed_model_req")]
|
|
embed_model: String,
|
|
#[serde(default)]
|
|
created_by: String,
|
|
/// Federation: optional per-profile bucket (`profile:{id}` by convention).
|
|
/// Omitting keeps artifacts in primary.
|
|
#[serde(default)]
|
|
bucket: Option<String>,
|
|
/// ADR-019 hybrid: which vector backend to route this profile's
|
|
/// indexes to. Defaults to Parquet+HNSW.
|
|
#[serde(default)]
|
|
vector_backend: shared::types::VectorBackend,
|
|
}
|
|
|
|
fn default_embed_model_req() -> String { "nomic-embed-text".to_string() }
|
|
|
|
async fn create_profile(
|
|
State(registry): State<Registry>,
|
|
Json(req): Json<CreateProfileRequest>,
|
|
) -> impl IntoResponse {
|
|
let profile = shared::types::ModelProfile {
|
|
id: req.id,
|
|
ollama_name: req.ollama_name,
|
|
description: req.description,
|
|
bound_datasets: req.bound_datasets,
|
|
hnsw_config: req.hnsw_config,
|
|
embed_model: req.embed_model,
|
|
created_at: chrono::Utc::now(),
|
|
created_by: req.created_by,
|
|
bucket: req.bucket,
|
|
vector_backend: req.vector_backend,
|
|
};
|
|
match registry.put_profile(profile).await {
|
|
Ok(p) => Ok((StatusCode::CREATED, Json(p))),
|
|
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
|
|
}
|
|
}
|
|
|
|
async fn list_profiles(State(registry): State<Registry>) -> impl IntoResponse {
|
|
Json(registry.list_profiles().await)
|
|
}
|
|
|
|
async fn get_profile(
|
|
State(registry): State<Registry>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.get_profile(&id).await {
|
|
Some(p) => Ok(Json(p)),
|
|
None => Err((StatusCode::NOT_FOUND, format!("profile not found: {id}"))),
|
|
}
|
|
}
|
|
|
|
async fn delete_profile(
|
|
State(registry): State<Registry>,
|
|
Path(id): Path<String>,
|
|
) -> impl IntoResponse {
|
|
match registry.delete_profile(&id).await {
|
|
Ok(()) => Ok(StatusCode::NO_CONTENT),
|
|
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
|
}
|
|
}
|