Phases 16.2 + L2 + 17 VRAM gate + MySQL + 18 Lance hybrid milestone

Five threads of work landing as one milestone — all individually
verified end-to-end against real data, full release build clean,
46 unit tests pass.

## Phase 16.2 / 16.5 — autotune agent + ingest triggers

`vectord::agent` is a long-running tokio task that watches the trial
journal and autonomously proposes + runs new HNSW configs. Distinct
from `autotune::run_autotune` (synchronous one-shot grid). Triggered
on POST /vectors/agent/enqueue/{idx} or by the periodic wake; ingest
paths now push DatasetAppended events when an index's source dataset
gets re-ingested. Rate-limited (max_trials_per_hour) and cooldown-
gated so it can't saturate Ollama under live load.

The proposer is ε-greedy around the current champion: with prob 0.25
sample random from full bounds, otherwise perturb champion ± small
delta on both axes. Dedup against history. Deterministic — RNG seeded
from history.len() so the same journal state proposes the same next
config (helps offline replay debugging).

`[agent]` config section in lakehouse.toml; opt-in via enabled=true.

## Federation Layer 2 — runtime bucket lifecycle + per-index scoping

`BucketRegistry.buckets` moved to `std::sync::RwLock<HashMap>` so
buckets can be added/removed after startup. POST /storage/buckets
provisions at runtime; DELETE /storage/buckets/{name} unregisters
(refuses primary/rescue with 403). Local-backend buckets get their
root directory auto-created.

`IndexMeta.bucket` (default "primary" via serde) records each index's
home bucket. `TrialJournal` and `PromotionRegistry` now hold
Arc<BucketRegistry> + IndexRegistry; they resolve target store per-
index via IndexMeta.bucket. PromotionRegistry::list_all scans every
bucket and dedups by index_name. Pre-federation indexes keep working
unchanged — they just default to primary.

`ModelProfile.bucket: Option<String>` declares per-profile artifact
home. POST /vectors/profile/{id}/activate auto-provisions the
profile's bucket under storage.profile_root if not yet registered.

EvalSets stay primary-only for now — noted gap, low-risk to extend
later with the same resolver pattern.

## Phase 17 — VRAM-aware two-profile gate

Sidecar gains POST /admin/unload (Ollama keep_alive=0 trick — forces
immediate VRAM release), POST /admin/preload (keep_alive=5m with
empty prompt, takes the slot warm), and GET /admin/vram (combines
nvidia-smi snapshot with Ollama /api/ps). Exposed via aibridge as
unload_model / preload_model / vram_snapshot.

`VectorState.active_profile` is the GPU-slot singleton —
Arc<RwLock<Option<ActiveProfileSlot>>>. activate_profile checks for
a previous profile with a different ollama_name and unloads it
before preloading the new one; same-model reactivations skip the
unload (Ollama no-ops). New routes: POST /vectors/profile/{id}/
deactivate (unload + clear slot), GET /vectors/profile/active.

Verified live: staffing-recruiter (qwen2.5) → docs-assistant
(mistral) swap freed qwen2.5 from VRAM and loaded mistral. nomic-
embed-text persists across swaps because both profiles use it —
free optimization that fell out of the design. Scoped search
correctly 403s cross-profile in both directions.

## MySQL streaming connector

`crates/ingestd/src/my_stream.rs` mirrors pg_stream.rs for MySQL.
Pure-rust `mysql_async` driver (default-features=false to avoid C
deps). Same OFFSET pagination, same Parquet-streaming write shape.
Type mapping per ADR-010: int/bigint → Int32/Int64, decimal/float
→ Float64, tinyint(1)/bool → Boolean, everything else → Utf8 with
fallback parsers for date/time/json/uuid via Display.

POST /ingest/mysql parallel to /ingest/db. Same PII auto-detection,
same lineage capture (source_system="mysql"), same agent-trigger
hook. `redact_dsn` generalized — was hardcoded to "postgresql://"
length, now works for any scheme://user:pass@host/path URL (latent
PII leak fix for MySQL DSNs).

Verified live against MariaDB on localhost: 10 rows × 9 columns of
test data round-tripped through datatypes int/varchar/decimal/
tinyint/datetime/text. PII detection auto-flagged name + email.
Aggregation queries through DataFusion match the source values
exactly.

## Phase 18 — Hybrid Parquet+HNSW ⊕ Lance backend (ADR-019)

`vectord-lance` is a new firewall crate. Lance pulls Arrow 57 and
DataFusion 52 — incompatible with the rest of the workspace's
Arrow 55 / DataFusion 47. The firewall isolates that dep tree:
public API uses only std types (Vec<f32>, Vec<String>, Hit, Row,
*Stats), so no Arrow types cross the crate boundary and nothing
propagates to vectord. The ADR-019 path that didn't ship until now.

`vectord::lance_backend::LanceRegistry` lazy-creates a
LanceVectorStore per index, resolving bucket → URI via the
conventional local-bucket layout. `IndexMeta.vector_backend` and
`ModelProfile.vector_backend` carry the choice (default Parquet so
existing indexes unchanged).

Six routes under /vectors/lance/*:
- migrate/{idx}: convert binary-blob Parquet → Lance FixedSizeList
- index/{idx}: build IVF_PQ
- search/{idx}: vector search (embed via sidecar)
- doc/{idx}/{doc_id}: random row fetch
- append/{idx}: native fragment append
- stats/{idx}: row count + index presence

Verified live on the real resumes_100k_v2 corpus (100K × 768d):
- Migrate: 0.57s
- Build IVF_PQ index: 16.2s (matches ADR-019 bench; 14× faster than
  HNSW's 230s for the same data)
- Search end-to-end (Ollama embed + Lance scan): 23-53ms
- Random doc_id fetch: 5-7ms (filter scan; faster than Parquet's
  ~35ms full-file scan, slower than the bench's 311us positional
  take — would close that gap with a scalar btree on doc_id)
- Append 100 rows: 3.3ms / +320KB on disk vs Parquet's required
  full ~330MB rewrite — the structural win
- Index survives append; both backends coexist cleanly

## Known follow-ups not in this milestone

- ModelProfile.vector_backend doesn't yet auto-route /vectors/profile/
  {id}/search to Lance; callers go through /vectors/lance/* directly
- Scalar btree on doc_id (closes the 5-7ms → ~300us gap)
- vectord-lance built default-features=false → no S3 yet
- IVF_PQ recall not measured (ADR-019 caveat) — needs a Lance-aware
  variant of the eval harness
- Watcher-path ingest doesn't push agent triggers (HTTP paths do)
- EvalSets still primary-only (federation gap)
- No PATCH endpoint to move an existing index between buckets
- The pre-existing storaged::append_log doctest fails to compile
  (malformed `{prefix}/` parses as code fence) — pre-existing bug,
  left for a focused fix

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 20:24:46 -05:00
parent 4e1c400f5d
commit 0d037cfac1
28 changed files with 3240 additions and 111 deletions

273
Cargo.lock generated
View File

@ -262,7 +262,7 @@ dependencies = [
"arrow-schema 55.2.0",
"arrow-select 55.2.0",
"atoi",
"base64",
"base64 0.22.1",
"chrono",
"comfy-table",
"half",
@ -284,7 +284,7 @@ dependencies = [
"arrow-schema 57.3.0",
"arrow-select 57.3.0",
"atoi",
"base64",
"base64 0.22.1",
"chrono",
"comfy-table",
"half",
@ -738,6 +738,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
@ -757,6 +763,24 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bindgen"
version = "0.72.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"itertools 0.13.0",
"proc-macro2",
"quote",
"regex",
"rustc-hash 2.1.1",
"shlex",
"syn 2.0.117",
]
[[package]]
name = "bitflags"
version = "2.11.0"
@ -871,6 +895,15 @@ dependencies = [
"alloc-stdlib",
]
[[package]]
name = "btoi"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd6407f73a9b8b6162d8a2ef999fe6afd7cc15902ebf42c5cd296addf17e0ad"
dependencies = [
"num-traits",
]
[[package]]
name = "bumpalo"
version = "3.20.2"
@ -976,6 +1009,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom 7.1.3",
]
[[package]]
name = "cfb"
version = "0.7.3"
@ -1005,7 +1047,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1f927b07c74ba84c7e5fe4db2baeb3e996ab2688992e39ac68ce3220a677c7e"
dependencies = [
"base64",
"base64 0.22.1",
"encoding_rs",
]
@ -1070,6 +1112,26 @@ dependencies = [
"inout",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "cmake"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678"
dependencies = [
"cc",
]
[[package]]
name = "combine"
version = "4.6.7"
@ -1283,6 +1345,19 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
@ -1638,7 +1713,7 @@ dependencies = [
"ahash",
"arrow 55.2.0",
"arrow-ipc 55.2.0",
"base64",
"base64 0.22.1",
"half",
"hashbrown 0.14.5",
"indexmap",
@ -2041,7 +2116,7 @@ checksum = "2ddf0a0a2db5d2918349c978d42d80926c6aa2459cd8a3c533a84ec4bb63479e"
dependencies = [
"arrow 55.2.0",
"arrow-buffer 55.2.0",
"base64",
"base64 0.22.1",
"blake2",
"blake3",
"chrono",
@ -2070,7 +2145,7 @@ checksum = "07356c94118d881130dd0ffbff127540407d969c8978736e324edcd6c41cd48f"
dependencies = [
"arrow 57.3.0",
"arrow-buffer 57.3.0",
"base64",
"base64 0.22.1",
"blake2",
"blake3",
"chrono",
@ -2867,7 +2942,7 @@ dependencies = [
"async-tungstenite",
"axum",
"axum-core",
"base64",
"base64 0.22.1",
"bytes",
"ciborium",
"const-str",
@ -2921,7 +2996,7 @@ checksum = "cda8b152e85121243741b9d5f2a3d8cb3c47a7b2299e902f98b6a7719915b0a2"
dependencies = [
"anyhow",
"axum-core",
"base64",
"base64 0.22.1",
"ciborium",
"dioxus-core",
"dioxus-document",
@ -3325,6 +3400,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
dependencies = [
"crc32fast",
"libz-sys",
"miniz_oxide",
"zlib-rs",
]
@ -3705,7 +3781,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb"
dependencies = [
"base64",
"base64 0.22.1",
"bytes",
"headers-core",
"http",
@ -3867,7 +3943,7 @@ version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
dependencies = [
"base64",
"base64 0.22.1",
"bytes",
"futures-channel",
"futures-util",
@ -4063,6 +4139,7 @@ dependencies = [
"chrono",
"csv",
"lopdf",
"mysql_async",
"object_store",
"parquet 55.2.0",
"serde",
@ -4074,6 +4151,7 @@ dependencies = [
"tokio-postgres",
"tracing",
"uuid",
"vectord",
]
[[package]]
@ -4306,6 +4384,15 @@ dependencies = [
"bitflags",
]
[[package]]
name = "keyed_priority_queue"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d"
dependencies = [
"indexmap",
]
[[package]]
name = "lance"
version = "4.0.0"
@ -4650,7 +4737,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"twox-hash",
"twox-hash 2.1.2",
"uuid",
]
@ -4890,6 +4977,17 @@ dependencies = [
"libc",
]
[[package]]
name = "libz-sys"
version = "1.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
@ -5012,7 +5110,7 @@ version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a"
dependencies = [
"twox-hash",
"twox-hash 2.1.2",
]
[[package]]
@ -5021,7 +5119,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
dependencies = [
"twox-hash",
"twox-hash 2.1.2",
]
[[package]]
@ -5255,6 +5353,68 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b"
[[package]]
name = "mysql_async"
version = "0.34.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0b66e411c31265e879d9814d03721f2daa7ad07337b6308cb4bb0cde7e6fd47"
dependencies = [
"bytes",
"crossbeam",
"flate2",
"futures-core",
"futures-sink",
"futures-util",
"keyed_priority_queue",
"lru",
"mysql_common",
"pem",
"percent-encoding",
"pin-project",
"rand 0.8.5",
"serde",
"serde_json",
"socket2 0.5.10",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"twox-hash 1.6.3",
"url",
]
[[package]]
name = "mysql_common"
version = "0.32.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478b0ff3f7d67b79da2b96f56f334431aef65e15ba4b29dd74a4236e29582bdc"
dependencies = [
"base64 0.21.7",
"bindgen",
"bitflags",
"btoi",
"byteorder",
"bytes",
"cc",
"cmake",
"crc32fast",
"flate2",
"lazy_static",
"num-bigint",
"num-traits",
"rand 0.8.5",
"regex",
"saturating",
"serde",
"serde_json",
"sha1",
"sha2",
"smallvec",
"subprocess",
"thiserror 1.0.69",
"uuid",
"zstd",
]
[[package]]
name = "ndarray"
version = "0.16.1"
@ -5485,7 +5645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00"
dependencies = [
"async-trait",
"base64",
"base64 0.22.1",
"bytes",
"chrono",
"form_urlencoded",
@ -5667,7 +5827,7 @@ dependencies = [
"arrow-ipc 55.2.0",
"arrow-schema 55.2.0",
"arrow-select 55.2.0",
"base64",
"base64 0.22.1",
"brotli",
"bytes",
"chrono",
@ -5685,7 +5845,7 @@ dependencies = [
"snap",
"thrift",
"tokio",
"twox-hash",
"twox-hash 2.1.2",
"zstd",
]
@ -5703,7 +5863,7 @@ dependencies = [
"arrow-ipc 57.3.0",
"arrow-schema 57.3.0",
"arrow-select 57.3.0",
"base64",
"base64 0.22.1",
"brotli",
"bytes",
"chrono",
@ -5719,7 +5879,7 @@ dependencies = [
"simdutf8",
"snap",
"thrift",
"twox-hash",
"twox-hash 2.1.2",
"zstd",
]
@ -5741,6 +5901,16 @@ dependencies = [
"stfu8",
]
[[package]]
name = "pem"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be"
dependencies = [
"base64 0.22.1",
"serde_core",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
@ -5871,7 +6041,7 @@ version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee9dd5fe15055d2b6806f4736aa0c9637217074e224bbec46d4041b91bb9491"
dependencies = [
"base64",
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
@ -6112,6 +6282,7 @@ dependencies = [
"datafusion 47.0.0",
"futures",
"object_store",
"parquet 55.2.0",
"serde",
"serde_json",
"shared",
@ -6427,7 +6598,7 @@ version = "0.12.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
dependencies = [
"base64",
"base64 0.22.1",
"bytes",
"cookie",
"cookie_store",
@ -6618,6 +6789,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "saturating"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71"
[[package]]
name = "schannel"
version = "0.1.29"
@ -7039,6 +7216,12 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "std_prelude"
version = "0.2.12"
@ -7106,6 +7289,16 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "subprocess"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c56e8662b206b9892d7a5a3f2ecdbcb455d3d6b259111373b7e08b8055158a8"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "subsecond"
version = "0.7.3"
@ -7196,7 +7389,7 @@ checksum = "64a966cb0e76e311f09cf18507c9af192f15d34886ee43d7ba7c7e3803660c43"
dependencies = [
"aho-corasick",
"arc-swap",
"base64",
"base64 0.22.1",
"bitpacking",
"bon",
"byteorder",
@ -7655,7 +7848,7 @@ checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum",
"base64",
"base64 0.22.1",
"bytes",
"h2",
"http",
@ -7872,6 +8065,17 @@ dependencies = [
"utf-8",
]
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"rand 0.8.5",
"static_assertions",
]
[[package]]
name = "twox-hash"
version = "2.1.2"
@ -8004,6 +8208,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vectord"
version = "0.1.0"
@ -8012,6 +8222,7 @@ dependencies = [
"arrow 55.2.0",
"axum",
"bytes",
"catalogd",
"chrono",
"instant-distance",
"object_store",
@ -8023,6 +8234,24 @@ dependencies = [
"tokio",
"tracing",
"uuid",
"vectord-lance",
]
[[package]]
name = "vectord-lance"
version = "0.1.0"
dependencies = [
"arrow 57.3.0",
"arrow-array 57.3.0",
"arrow-schema 57.3.0",
"bytes",
"futures",
"lance",
"lance-index",
"lance-linalg",
"parquet 57.3.0",
"serde",
"tokio",
]
[[package]]

View File

@ -13,6 +13,7 @@ members = [
"crates/gateway",
"crates/ui",
"crates/lance-bench",
"crates/vectord-lance",
]
[workspace.dependencies]
@ -47,3 +48,4 @@ lopdf = "0.35"
encoding_rs = "0.8"
instant-distance = "0.6"
tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-chrono-0_4", "with-uuid-1"] }
mysql_async = { version = "0.34", default-features = false, features = ["minimal"] }

View File

@ -134,4 +134,47 @@ impl AiClient {
}
resp.json().await.map_err(|e| format!("rerank parse error: {e}"))
}
/// Force Ollama to unload the named model from VRAM (keep_alive=0).
/// Used for predictable profile swaps — without this, Ollama holds a
/// model for its configured TTL (default 5min) and the previous
/// profile's model can linger in VRAM next to the new one.
pub async fn unload_model(&self, model: &str) -> Result<serde_json::Value, String> {
let resp = self.client
.post(format!("{}/admin/unload", self.base_url))
.json(&serde_json::json!({ "model": model }))
.send().await
.map_err(|e| format!("unload request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("unload error: {text}"));
}
resp.json().await.map_err(|e| format!("unload parse error: {e}"))
}
/// Ask Ollama to load the named model into VRAM proactively. Makes
/// the first real request after profile activation fast (no cold-load
/// latency).
pub async fn preload_model(&self, model: &str) -> Result<serde_json::Value, String> {
let resp = self.client
.post(format!("{}/admin/preload", self.base_url))
.json(&serde_json::json!({ "model": model }))
.send().await
.map_err(|e| format!("preload request failed: {e}"))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(format!("preload error: {text}"));
}
resp.json().await.map_err(|e| format!("preload parse error: {e}"))
}
/// GPU + loaded-model snapshot from the sidecar. Combines nvidia-smi
/// output (if available) with Ollama's /api/ps.
pub async fn vram_snapshot(&self) -> Result<serde_json::Value, String> {
let resp = self.client
.get(format!("{}/admin/vram", self.base_url))
.send().await
.map_err(|e| format!("vram request failed: {e}"))?;
resp.json().await.map_err(|e| format!("vram parse error: {e}"))
}
}

View File

@ -16,9 +16,17 @@ pub fn router(client: AiClient) -> Router {
.route("/embed", post(embed))
.route("/generate", post(generate))
.route("/rerank", post(rerank))
.route("/vram", get(vram))
.with_state(client)
}
async fn vram(State(client): State<AiClient>) -> impl IntoResponse {
match client.vram_snapshot().await {
Ok(snap) => Ok(Json(snap)),
Err(e) => Err((StatusCode::BAD_GATEWAY, e)),
}
}
async fn health(State(client): State<AiClient>) -> impl IntoResponse {
match client.health().await {
Ok(info) => Ok(Json(info)),

View File

@ -357,6 +357,14 @@ struct CreateProfileRequest {
embed_model: String,
#[serde(default)]
created_by: String,
/// Federation: optional per-profile bucket (`profile:{id}` by convention).
/// Omitting keeps artifacts in primary.
#[serde(default)]
bucket: Option<String>,
/// ADR-019 hybrid: which vector backend to route this profile's
/// indexes to. Defaults to Parquet+HNSW.
#[serde(default)]
vector_backend: shared::types::VectorBackend,
}
fn default_embed_model_req() -> String { "nomic-embed-text".to_string() }
@ -374,6 +382,8 @@ async fn create_profile(
embed_model: req.embed_model,
created_at: chrono::Utc::now(),
created_by: req.created_by,
bucket: req.bucket,
vector_backend: req.vector_backend,
};
match registry.put_profile(profile).await {
Ok(p) => Ok((StatusCode::CREATED, Json(p))),

View File

@ -75,6 +75,41 @@ async fn main() {
// AI sidecar client
let ai_client = aibridge::client::AiClient::new(&config.sidecar.url);
// Vector service components — built before the router because both the
// /vectors service AND ingestd need the agent handle to enqueue triggers.
let index_reg = vectord::index_registry::IndexRegistry::new(store.clone());
let _ = index_reg.rebuild().await;
let hnsw = vectord::hnsw::HnswStore::new();
let emb_cache = vectord::embedding_cache::EmbeddingCache::new(store.clone());
// Phase B (federation layer 2): trial journals + promotion files now
// live in each index's recorded bucket (IndexMeta.bucket), not always
// primary. Journal / registry resolve per-call via the bucket registry.
let tj = vectord::trial::TrialJournal::new(bucket_registry.clone(), index_reg.clone());
let pr = vectord::promotion::PromotionRegistry::new(bucket_registry.clone(), index_reg.clone());
// Phase 16.2: spawn the autotune agent. When config.agent.enabled=false
// this returns a handle that drops triggers silently — no surprise load.
let agent_cfg = vectord::agent::AgentConfig {
enabled: config.agent.enabled,
cycle_interval_secs: config.agent.cycle_interval_secs,
cooldown_between_trials_secs: config.agent.cooldown_between_trials_secs,
min_recall: config.agent.min_recall,
max_trials_per_hour: config.agent.max_trials_per_hour,
};
let agent_handle = vectord::agent::spawn(
agent_cfg,
vectord::agent::AgentDeps {
store: store.clone(),
ai_client: ai_client.clone(),
catalog: registry.clone(),
index_registry: index_reg.clone(),
hnsw_store: hnsw.clone(),
embedding_cache: emb_cache.clone(),
trial_journal: tj.clone(),
promotion_registry: pr.clone(),
},
);
// HTTP router
let mut app = Router::new()
.route("/health", get(health))
@ -87,21 +122,25 @@ async fn main() {
store: store.clone(),
registry: registry.clone(),
buckets: bucket_registry.clone(),
agent_handle: agent_handle.clone(),
index_registry: index_reg.clone(),
}))
.nest("/vectors", vectord::service::router({
let index_reg = vectord::index_registry::IndexRegistry::new(store.clone());
let _ = index_reg.rebuild().await;
vectord::service::VectorState {
store: store.clone(),
ai_client: ai_client.clone(),
job_tracker: vectord::jobs::JobTracker::new(),
index_registry: index_reg,
hnsw_store: vectord::hnsw::HnswStore::new(),
embedding_cache: vectord::embedding_cache::EmbeddingCache::new(store.clone()),
trial_journal: vectord::trial::TrialJournal::new(store.clone()),
catalog: registry.clone(),
promotion_registry: vectord::promotion::PromotionRegistry::new(store.clone()),
}
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
store: store.clone(),
ai_client: ai_client.clone(),
job_tracker: vectord::jobs::JobTracker::new(),
index_registry: index_reg.clone(),
hnsw_store: hnsw,
embedding_cache: emb_cache,
trial_journal: tj,
catalog: registry.clone(),
promotion_registry: pr,
agent_handle,
bucket_registry: bucket_registry.clone(),
active_profile: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
lance: vectord::lance_backend::LanceRegistry::new(
bucket_registry.clone(), index_reg.clone(),
),
}))
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
.nest("/journal", journald::service::router(journal))

View File

@ -7,6 +7,7 @@ edition = "2024"
shared = { path = "../shared" }
storaged = { path = "../storaged" }
catalogd = { path = "../catalogd" }
vectord = { path = "../vectord" }
tokio = { workspace = true }
axum = { workspace = true, features = ["multipart"] }
lopdf = { workspace = true }
@ -21,4 +22,5 @@ csv = { workspace = true }
chrono = { workspace = true }
object_store = { workspace = true }
tokio-postgres = { workspace = true }
mysql_async = { workspace = true }
uuid = { workspace = true }

View File

@ -1,4 +1,5 @@
pub mod db_ingest;
pub mod my_stream;
pub mod pg_stream;
pub mod detect;
pub mod csv_ingest;

View File

@ -0,0 +1,410 @@
/// Streaming MySQL ingest.
///
/// Mirrors `pg_stream` for MySQL sources. Same OFFSET-paginated strategy,
/// same Parquet-streaming shape. Uses `mysql_async` (pure-rust) so we
/// don't need a C client library at build time.
///
/// Type mapping follows ADR-010 (default to string on ambiguity):
/// - Booleans (TINYINT(1)) and integer types map to Arrow Int32/Int64
/// - Floating point and decimals → Float64
/// - Everything else (text, varchar, json, date, timestamp) → Utf8
///
/// What's deliberately not supported (yet):
/// - TLS connections — `minimal` feature is plain TCP only. Upgrade
/// when a tenant actually needs it.
/// - Keyset pagination — OFFSET scans are O(N²) at multi-million-row
/// scale; same caveat as `pg_stream`.
/// - BINARY/BLOB columns — currently rendered as base64 or empty string
/// via Display-fallback.
use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use mysql_async::prelude::*;
use mysql_async::{Conn, Opts, Row, Value};
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
/// Request shape for MySQL streaming ingest.
#[derive(Debug, Clone, serde::Deserialize)]
pub struct MyStreamRequest {
/// mysql://user:pass@host:port/db
pub dsn: String,
pub table: String,
#[serde(default)]
pub dataset_name: Option<String>,
/// Rows per fetch. Default 10_000.
#[serde(default)]
pub batch_size: Option<usize>,
/// Column to ORDER BY for stable pagination. If omitted, the first
/// column returned by the schema probe is used.
#[serde(default)]
pub order_by: Option<String>,
/// Hard cap on total rows (for sampling / previews).
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MyStreamResult {
pub table: String,
pub rows: usize,
pub batches: usize,
pub columns: usize,
pub schema: Vec<String>,
pub parquet_bytes: u64,
pub duration_secs: f32,
}
/// Parsed DSN pieces. Kept local (rather than reusing pg's DbConfig) so
/// the MySQL connector doesn't depend on the PG path.
#[derive(Debug, Clone)]
pub struct MyConfig {
pub host: String,
pub port: u16,
pub user: String,
pub password: String,
pub database: String,
}
impl MyConfig {
/// Build a mysql_async `Opts` from the parsed config.
pub fn to_opts(&self) -> Opts {
let url = if self.password.is_empty() {
format!(
"mysql://{}@{}:{}/{}",
percent(&self.user), percent(&self.host), self.port, percent(&self.database),
)
} else {
format!(
"mysql://{}:{}@{}:{}/{}",
percent(&self.user), percent(&self.password),
percent(&self.host), self.port, percent(&self.database),
)
};
Opts::from_url(&url).expect("MyConfig.to_opts: rebuilt URL should parse")
}
}
/// Minimal URL-encoder for the few characters that commonly appear in
/// MySQL passwords. mysql_async's URL parser expects valid URL-encoded
/// components.
fn percent(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'@' | ':' | '/' | '?' | '#' | '[' | ']' | ' ' | '%' => {
out.push_str(&format!("%{:02X}", c as u32));
}
_ => out.push(c),
}
}
out
}
/// Parse a mysql:// DSN.
/// Supports: mysql://[user[:password]@]host[:port][/db]
pub fn parse_dsn(dsn: &str) -> Result<MyConfig, String> {
let rest = dsn
.strip_prefix("mysql://")
.ok_or_else(|| "DSN must start with mysql://".to_string())?;
let (auth_host, database) = match rest.split_once('/') {
Some((ah, db)) => (ah, db.split('?').next().unwrap_or(db).to_string()),
None => (rest, String::new()),
};
let (userpass, hostport) = match auth_host.rsplit_once('@') {
Some((up, hp)) => (Some(up), hp),
None => (None, auth_host),
};
let (user, password) = match userpass {
Some(up) => match up.split_once(':') {
Some((u, p)) => (u.to_string(), p.to_string()),
None => (up.to_string(), String::new()),
},
None => ("root".to_string(), String::new()),
};
let (host, port) = match hostport.rsplit_once(':') {
Some((h, p)) => (
h.to_string(),
p.parse::<u16>().map_err(|_| format!("invalid port in DSN: {p}"))?,
),
None => (hostport.to_string(), 3306),
};
if host.is_empty() {
return Err("DSN has no host".into());
}
if database.is_empty() {
return Err("DSN has no database path (mysql://... /db)".into());
}
Ok(MyConfig { host, port, user, password, database })
}
/// Stream a MySQL table as Parquet bytes.
pub async fn stream_table_to_parquet(
req: &MyStreamRequest,
) -> Result<(bytes::Bytes, MyStreamResult), String> {
let t0 = std::time::Instant::now();
let config = parse_dsn(&req.dsn)?;
let batch_size = req.batch_size.unwrap_or(10_000).max(1);
let mut conn = Conn::new(config.to_opts()).await
.map_err(|e| format!("mysql connect: {e}"))?;
// Probe columns via information_schema — gives real type names that
// we can map to Arrow dtypes without needing to fetch a row first.
let columns = probe_columns(&mut conn, &config.database, &req.table).await?;
if columns.is_empty() {
return Err(format!("table '{}' not found or has no columns", req.table));
}
let arrow_fields: Vec<Field> = columns
.iter()
.map(|(name, ty)| Field::new(name, mysql_type_to_arrow(ty), true))
.collect();
let schema = Arc::new(Schema::new(arrow_fields));
let schema_report: Vec<String> = columns
.iter()
.map(|(n, t)| format!("{}:{}", n, t))
.collect();
let order_col = req.order_by.clone().unwrap_or_else(|| columns[0].0.clone());
let mut buf: Vec<u8> = Vec::with_capacity(1024 * 1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)
.map_err(|e| format!("ArrowWriter init: {e}"))?;
let mut total_rows: usize = 0;
let mut batch_count: usize = 0;
let row_cap = req.limit.unwrap_or(usize::MAX);
loop {
let remaining = row_cap.saturating_sub(total_rows);
if remaining == 0 { break; }
let fetch = remaining.min(batch_size);
// Backticks are MySQL's identifier quote. Forbid backticks in
// table/column names to prevent injection — neither pg nor us
// should be accepting those anyway.
if req.table.contains('`') || order_col.contains('`') {
return Err("table or order_by column contains backticks — refused".into());
}
let sql = format!(
"SELECT * FROM `{}` ORDER BY `{}` LIMIT {} OFFSET {}",
req.table, order_col, fetch, total_rows,
);
let rows: Vec<Row> = conn.query(&sql).await
.map_err(|e| format!("fetch batch at offset {total_rows}: {e}"))?;
if rows.is_empty() { break; }
let n = rows.len();
let arrays: Vec<ArrayRef> = columns
.iter()
.enumerate()
.map(|(idx, (_, ty))| rows_to_column(&rows, idx, ty))
.collect::<Result<_, _>>()?;
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch: {e}"))?;
writer.write(&batch).map_err(|e| format!("ArrowWriter::write: {e}"))?;
total_rows += n;
batch_count += 1;
tracing::info!(
"mysql stream '{}': fetched batch {} ({} rows, total {})",
req.table, batch_count, n, total_rows,
);
if n < fetch { break; }
}
writer.close().map_err(|e| format!("ArrowWriter::close: {e}"))?;
conn.disconnect().await.ok();
let result = MyStreamResult {
table: req.table.clone(),
rows: total_rows,
batches: batch_count,
columns: columns.len(),
schema: schema_report,
parquet_bytes: buf.len() as u64,
duration_secs: t0.elapsed().as_secs_f32(),
};
Ok((bytes::Bytes::from(buf), result))
}
async fn probe_columns(
conn: &mut Conn,
schema: &str,
table: &str,
) -> Result<Vec<(String, String)>, String> {
let sql = format!(
"SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns \
WHERE table_schema = '{}' AND table_name = '{}' \
ORDER BY ORDINAL_POSITION",
schema.replace('\'', "''"),
table.replace('\'', "''"),
);
let rows: Vec<(String, String)> = conn.query(&sql).await
.map_err(|e| format!("probe columns: {e}"))?;
Ok(rows)
}
/// MySQL data type string → Arrow DataType. Conservative: anything we
/// don't recognize becomes Utf8 (ADR-010).
fn mysql_type_to_arrow(ty: &str) -> DataType {
match ty.to_lowercase().as_str() {
"tinyint" | "smallint" | "mediumint" | "int" | "integer" => DataType::Int32,
"bigint" => DataType::Int64,
"float" | "double" | "decimal" | "numeric" | "real" => DataType::Float64,
"bit" | "bool" | "boolean" => DataType::Boolean,
_ => DataType::Utf8,
}
}
/// Convert a single column slice of MySQL rows into an Arrow array.
fn rows_to_column(
rows: &[Row],
idx: usize,
ty: &str,
) -> Result<ArrayRef, String> {
let arrow_ty = mysql_type_to_arrow(ty);
match arrow_ty {
DataType::Boolean => {
let v: Vec<Option<bool>> = rows.iter().map(|r| cell_as_bool(r, idx)).collect();
Ok(Arc::new(BooleanArray::from(v)))
}
DataType::Int32 => {
let v: Vec<Option<i32>> = rows.iter().map(|r| cell_as_i64(r, idx).map(|n| n as i32)).collect();
Ok(Arc::new(Int32Array::from(v)))
}
DataType::Int64 => {
let v: Vec<Option<i64>> = rows.iter().map(|r| cell_as_i64(r, idx)).collect();
Ok(Arc::new(Int64Array::from(v)))
}
DataType::Float64 => {
let v: Vec<Option<f64>> = rows.iter().map(|r| cell_as_f64(r, idx)).collect();
Ok(Arc::new(Float64Array::from(v)))
}
_ => {
let v: Vec<Option<String>> = rows.iter().map(|r| cell_as_string(r, idx)).collect();
Ok(Arc::new(StringArray::from(v)))
}
}
}
fn cell(r: &Row, idx: usize) -> &Value { r.as_ref(idx).unwrap_or(&Value::NULL) }
fn cell_as_bool(r: &Row, idx: usize) -> Option<bool> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n != 0),
Value::UInt(n) => Some(*n != 0),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| {
match s.to_ascii_lowercase().as_str() {
"true" | "1" | "y" | "yes" => Some(true),
"false" | "0" | "n" | "no" => Some(false),
_ => None,
}
}),
_ => None,
}
}
fn cell_as_i64(r: &Row, idx: usize) -> Option<i64> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n),
Value::UInt(n) => i64::try_from(*n).ok(),
Value::Float(f) => Some(*f as i64),
Value::Double(f) => Some(*f as i64),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()),
_ => None,
}
}
fn cell_as_f64(r: &Row, idx: usize) -> Option<f64> {
match cell(r, idx) {
Value::NULL => None,
Value::Int(n) => Some(*n as f64),
Value::UInt(n) => Some(*n as f64),
Value::Float(f) => Some(*f as f64),
Value::Double(f) => Some(*f),
Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()),
_ => None,
}
}
fn cell_as_string(r: &Row, idx: usize) -> Option<String> {
match cell(r, idx) {
Value::NULL => None,
Value::Bytes(b) => Some(String::from_utf8_lossy(b).into_owned()),
Value::Int(n) => Some(n.to_string()),
Value::UInt(n) => Some(n.to_string()),
Value::Float(f) => Some(f.to_string()),
Value::Double(f) => Some(f.to_string()),
Value::Date(y, mo, d, h, mi, s, _us) => {
Some(format!("{:04}-{:02}-{:02} {:02}:{:02}:{:02}", y, mo, d, h, mi, s))
}
Value::Time(neg, days, h, mi, s, _us) => {
let sign = if *neg { "-" } else { "" };
Some(format!("{}{}d {:02}:{:02}:{:02}", sign, days, h, mi, s))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_dsn_full() {
let c = parse_dsn("mysql://daisy:secret@db.example.com:3307/my_db").unwrap();
assert_eq!(c.host, "db.example.com");
assert_eq!(c.port, 3307);
assert_eq!(c.user, "daisy");
assert_eq!(c.password, "secret");
assert_eq!(c.database, "my_db");
}
#[test]
fn parse_dsn_default_port() {
let c = parse_dsn("mysql://root@localhost/shop").unwrap();
assert_eq!(c.port, 3306);
assert_eq!(c.user, "root");
assert_eq!(c.password, "");
}
#[test]
fn parse_dsn_no_auth() {
let c = parse_dsn("mysql://127.0.0.1:3306/analytics").unwrap();
assert_eq!(c.user, "root");
assert_eq!(c.host, "127.0.0.1");
}
#[test]
fn parse_dsn_rejects_non_mysql() {
assert!(parse_dsn("postgresql://host/db").is_err());
}
#[test]
fn parse_dsn_requires_db() {
assert!(parse_dsn("mysql://localhost:3306").is_err());
}
#[test]
fn type_map_basics() {
assert_eq!(mysql_type_to_arrow("int"), DataType::Int32);
assert_eq!(mysql_type_to_arrow("BIGINT"), DataType::Int64);
assert_eq!(mysql_type_to_arrow("decimal"), DataType::Float64);
assert_eq!(mysql_type_to_arrow("varchar"), DataType::Utf8);
assert_eq!(mysql_type_to_arrow("json"), DataType::Utf8);
assert_eq!(mysql_type_to_arrow("bool"), DataType::Boolean);
}
}

View File

@ -11,7 +11,7 @@ use serde::Deserialize;
use std::sync::Arc;
use catalogd::registry::Registry;
use crate::{db_ingest, pg_stream, pipeline};
use crate::{db_ingest, my_stream, pg_stream, pipeline};
use shared::arrow_helpers::record_batch_to_parquet;
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
@ -23,6 +23,36 @@ pub struct IngestState {
pub registry: Registry,
/// Federation layer 2: lookup target bucket from request headers.
pub buckets: Arc<BucketRegistry>,
/// Phase 16.5: when ingest marks a dataset's embeddings stale, push
/// a `DatasetAppended` trigger so the autotune agent can schedule
/// re-trials. Holds the agent handle (no-op when agent disabled).
pub agent_handle: vectord::agent::AgentHandle,
/// Used to look up which HNSW indexes are attached to the
/// just-ingested dataset. Each matching index gets one trigger.
pub index_registry: vectord::index_registry::IndexRegistry,
}
/// Push `DatasetAppended` triggers for every HNSW index bound to this
/// dataset. Called after a successful ingest. Logs failures but never
/// fails the ingest — the agent is best-effort, not load-bearing.
async fn notify_agent_on_append(state: &IngestState, dataset_name: &str) {
let indexes = state.index_registry.list(Some(dataset_name), None).await;
if indexes.is_empty() {
return;
}
for meta in indexes {
let event = vectord::agent::TriggerEvent::dataset_appended(
meta.index_name.clone(), dataset_name,
);
if let Err(e) = state.agent_handle.enqueue(event).await {
tracing::warn!("agent enqueue failed for '{}': {}", meta.index_name, e);
} else {
tracing::info!(
"agent: enqueued DatasetAppended({}) for index '{}'",
dataset_name, meta.index_name,
);
}
}
}
/// Resolve the target bucket from `X-Lakehouse-Bucket` header.
@ -55,6 +85,7 @@ pub fn router(state: IngestState) -> Router {
.route("/postgres/tables", post(list_pg_tables))
.route("/postgres/import", post(import_pg_table))
.route("/db", post(ingest_db_stream))
.route("/mysql", post(ingest_mysql_stream))
.with_state(state)
}
@ -94,6 +125,9 @@ async fn ingest_file(
match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await {
Ok(result) => {
if !result.deduplicated {
notify_agent_on_append(&state, &result.dataset_name).await;
}
if result.deduplicated {
Ok((StatusCode::OK, Json(result)))
} else {
@ -309,6 +343,9 @@ async fn ingest_db_stream(
// index. No-op for newly-created datasets.
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
// Phase 16.5: notify autotune agent. No-op if no indexes attached.
notify_agent_on_append(&state, &dataset_name).await;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"dataset_name": dataset_name,
"table": stream_result.table,
@ -322,17 +359,119 @@ async fn ingest_db_stream(
}))))
}
/// Redact the password in a postgresql:// DSN for logging / lineage.
fn redact_dsn(dsn: &str) -> String {
// Simple approach: find @...: and replace password between : and @.
if let Some(at_idx) = dsn.rfind('@') {
let prefix = &dsn[..at_idx];
if let Some(colon_idx) = prefix.rfind(':') {
// But only if the colon is after the scheme colons (postgresql://)
if colon_idx > "postgresql://".len() {
return format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]);
}
}
/// Streaming MySQL ingest. Mirrors `ingest_db_stream` — same pagination +
/// Parquet-footer schema recovery + PII/lineage metadata. Different driver,
/// otherwise identical.
async fn ingest_mysql_stream(
State(state): State<IngestState>,
headers: HeaderMap,
Json(req): Json<my_stream::MyStreamRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
tracing::info!(
"mysql stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
req.table, req.dataset_name, bucket, req.batch_size,
);
let (parquet, stream_result) = my_stream::stream_table_to_parquet(&req)
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, e))?;
if stream_result.rows == 0 {
return Ok((StatusCode::OK, Json(serde_json::json!({
"table": req.table,
"rows": 0,
"message": "table is empty",
}))));
}
dsn.to_string()
let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?;
let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone());
let storage_key = format!("datasets/{}.parquet", dataset_name);
let size_bytes = parquet.len() as u64;
ops::put(&store, &storage_key, parquet)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema);
let now = chrono::Utc::now();
state.registry.register(
dataset_name.clone(),
SchemaFingerprint(schema_fp.0),
vec![ObjectRef {
bucket: bucket.clone(),
key: storage_key.clone(),
size_bytes,
created_at: now,
}],
).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names);
let columns: Vec<shared::types::ColumnMeta> = schema.fields().iter().map(|f| {
let sens = shared::pii::detect_sensitivity(f.name());
shared::types::ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: sens.clone(),
description: String::new(),
is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)),
}
}).collect();
let lineage = shared::types::Lineage {
source_system: "mysql".to_string(),
source_file: format!("dsn: {}", redact_dsn(&req.dsn)),
ingest_job: format!("mysql-stream-{}", now.timestamp_millis()),
ingest_timestamp: now,
parent_datasets: vec![],
};
let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate {
sensitivity,
columns: Some(columns),
lineage: Some(lineage),
row_count: Some(stream_result.rows as u64),
..Default::default()
}).await;
// Phase C: mark stale + Phase 16.5: notify agent.
let _ = state.registry.mark_embeddings_stale(&dataset_name).await;
notify_agent_on_append(&state, &dataset_name).await;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"dataset_name": dataset_name,
"table": stream_result.table,
"rows": stream_result.rows,
"batches": stream_result.batches,
"columns": stream_result.columns,
"schema": stream_result.schema,
"storage_key": storage_key,
"size_bytes": size_bytes,
"duration_secs": stream_result.duration_secs,
}))))
}
/// Redact the password in any `scheme://user:pass@host/...` DSN. Works
/// for postgres, mysql, and any other scheme using the same shape.
fn redact_dsn(dsn: &str) -> String {
let scheme_end = match dsn.find("://") {
Some(i) => i + 3,
None => return dsn.to_string(),
};
let at_idx = match dsn.rfind('@') {
Some(i) if i > scheme_end => i,
_ => return dsn.to_string(),
};
// Find the password separator (first `:` after scheme, before `@`).
let userpass = &dsn[scheme_end..at_idx];
let colon_offset = match userpass.find(':') {
Some(i) => i,
None => return dsn.to_string(), // no password in DSN
};
let colon_idx = scheme_end + colon_offset;
format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..])
}

View File

@ -16,8 +16,46 @@ pub struct Config {
pub auth: AuthConfig,
#[serde(default)]
pub observability: ObservabilityConfig,
#[serde(default)]
pub agent: AgentSettings,
}
/// Phase 16.2 — background autotune agent settings.
///
/// Duplicated from `vectord::agent::AgentConfig` because `shared` can't
/// depend on `vectord` (vectord already depends on shared). The gateway
/// copies these into the vectord config at startup.
#[derive(Debug, Clone, Deserialize)]
pub struct AgentSettings {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_cycle_interval_secs")]
pub cycle_interval_secs: u64,
#[serde(default = "default_cooldown_secs")]
pub cooldown_between_trials_secs: u64,
#[serde(default = "default_min_recall")]
pub min_recall: f32,
#[serde(default = "default_max_trials_per_hour")]
pub max_trials_per_hour: u32,
}
impl Default for AgentSettings {
fn default() -> Self {
Self {
enabled: false,
cycle_interval_secs: default_cycle_interval_secs(),
cooldown_between_trials_secs: default_cooldown_secs(),
min_recall: default_min_recall(),
max_trials_per_hour: default_max_trials_per_hour(),
}
}
}
fn default_cycle_interval_secs() -> u64 { 60 }
fn default_cooldown_secs() -> u64 { 30 }
fn default_min_recall() -> f32 { 0.9 }
fn default_max_trials_per_hour() -> u32 { 30 }
#[derive(Debug, Clone, Deserialize)]
pub struct GatewayConfig {
#[serde(default = "default_host")]
@ -159,6 +197,7 @@ impl Default for Config {
ai: AiConfig::default(),
auth: AuthConfig::default(),
observability: ObservabilityConfig::default(),
agent: AgentSettings::default(),
}
}
}

View File

@ -255,10 +255,55 @@ pub struct ModelProfile {
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(default)]
pub created_by: String,
/// Federation layer 2: which bucket this profile's artifacts
/// (trial journals, promotions for its indexes) live in. If set and
/// not yet registered, `POST /vectors/profile/{id}/activate` auto-
/// provisions a local bucket under `storage.profile_root` and
/// registers it before warming indexes. `None` = share the primary
/// bucket (pre-federation behavior).
#[serde(default)]
pub bucket: Option<String>,
/// ADR-019 hybrid: which storage backend serves this profile's
/// indexes. Default Parquet so pre-existing profiles unchanged.
/// New profiles can opt into Lance for append-heavy / random-access
/// / 5M+ vector workloads.
#[serde(default)]
pub vector_backend: VectorBackend,
}
fn default_embed_model() -> String { "nomic-embed-text".to_string() }
/// Per-profile / per-index storage backend choice.
///
/// `Parquet` (default) — vectors as binary-blob in Parquet + in-RAM HNSW
/// for ANN. Best when:
/// - Index ≤ ~5M vectors (fits in RAM)
/// - Search latency dominates (HNSW p50 < 1ms at 100K)
/// - Workload is mostly read, not append
///
/// `Lance` — vectors as native FixedSizeList in a Lance dataset, IVF_PQ
/// on-disk index. Best when:
/// - Index will exceed RAM ceiling (5M+ vectors per index)
/// - Heavy random-row access for RAG (Lance: 311us; Parquet: ~35ms full
/// scan per fetch)
/// - Append-heavy ingest (Lance: 0.08s for 10K rows; Parquet: full
/// ~330MB rewrite per change)
/// - Need to rebuild index frequently (Lance IVF_PQ: 16s; HNSW: 230s
/// at the same scale)
///
/// Defaults to Parquet for backward compat with pre-ADR-019 indexes.
/// See `docs/ADR-019-vector-storage.md` for the full scorecard.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum VectorBackend {
Parquet,
Lance,
}
impl Default for VectorBackend {
fn default() -> Self { VectorBackend::Parquet }
}
/// Soft-delete marker (Phase E).
///
/// Tombstones live beside the dataset in `_catalog/tombstones/{dataset}/`

View File

@ -13,10 +13,11 @@ use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, put},
routing::{delete, get, post, put},
};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use shared::config::BucketConfig;
use std::sync::Arc;
use crate::error_journal::BucketErrorEvent;
@ -24,16 +25,68 @@ use crate::registry::{BucketInfo, BucketRegistry};
pub fn router(registry: Arc<BucketRegistry>) -> Router {
Router::new()
.route("/buckets", get(list_buckets))
.route("/buckets", get(list_buckets).post(add_bucket))
.route("/buckets/{name}", delete(remove_bucket))
.route("/errors", get(list_errors))
.route("/errors/flush", axum::routing::post(flush_errors))
.route("/errors/compact", axum::routing::post(compact_errors))
.route("/errors/flush", post(flush_errors))
.route("/errors/compact", post(compact_errors))
.route("/bucket-health", get(get_health))
.route("/buckets/{bucket}/objects/{*key}", put(put_bucket_object))
.route("/buckets/{bucket}/objects/{*key}", get(get_bucket_object))
.with_state(registry)
}
/// Provision + register a bucket at runtime. Body is a `BucketConfig`.
///
/// Federation layer 2: profile buckets (`profile:alice`) and tenant
/// buckets can be added after startup without a service restart.
async fn add_bucket(
State(reg): State<Arc<BucketRegistry>>,
Json(bc): Json<BucketConfig>,
) -> impl IntoResponse {
// Safety net: local backends must land somewhere under the configured
// profile_root (for profile:*) or be otherwise explicitly rooted.
// Refuse empty / missing root to avoid "bucket created at ./" surprises.
if bc.backend == "local" {
match bc.root.as_deref() {
Some(r) if !r.trim().is_empty() => {}
_ => return Err((StatusCode::BAD_REQUEST,
"local bucket requires a non-empty 'root' path".to_string())),
}
}
match reg.add_bucket(bc).await {
Ok(info) => Ok((StatusCode::CREATED, Json(info))),
Err(e) => {
let code = if e.contains("already registered") {
StatusCode::CONFLICT
} else {
StatusCode::BAD_REQUEST
};
Err((code, e))
}
}
}
/// Unregister a bucket. Refused for primary / rescue / unknown names.
async fn remove_bucket(
State(reg): State<Arc<BucketRegistry>>,
Path(name): Path<String>,
) -> impl IntoResponse {
match reg.remove_bucket(&name) {
Ok(()) => Ok((StatusCode::OK, format!("unregistered: {name}"))),
Err(e) => {
let code = if e.contains("cannot remove") {
StatusCode::FORBIDDEN
} else if e.contains("not registered") {
StatusCode::NOT_FOUND
} else {
StatusCode::BAD_REQUEST
};
Err((code, e))
}
}
}
async fn list_buckets(State(reg): State<Arc<BucketRegistry>>) -> Json<Vec<BucketInfo>> {
Json(reg.list().await)
}

View File

@ -15,7 +15,7 @@ use serde::Serialize;
use shared::config::{BucketConfig, StorageConfig};
use shared::secrets::{BucketCredentials, SecretsProvider};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use crate::error_journal::{BucketErrorEvent, ErrorJournal};
@ -55,10 +55,16 @@ pub enum BucketRole {
}
pub struct BucketRegistry {
buckets: HashMap<String, Arc<BucketEntry>>,
/// RwLock because federation layer 2 adds runtime bucket lifecycle
/// (`POST /storage/buckets` / `DELETE /storage/buckets/{name}`).
/// Almost all accesses are reads — writes happen at provision /
/// deactivate time only.
buckets: RwLock<HashMap<String, Arc<BucketEntry>>>,
default: String,
rescue: Option<String>,
profile_root: String,
/// Held so runtime `add_bucket` can resolve secret_ref handles.
secrets: Arc<dyn SecretsProvider>,
journal: ErrorJournal,
}
@ -113,10 +119,11 @@ impl BucketRegistry {
let _ = journal.load_recent().await;
Ok(Self {
buckets,
buckets: RwLock::new(buckets),
default: "primary".to_string(),
rescue: cfg.rescue_bucket.clone(),
profile_root: cfg.profile_root.clone(),
secrets,
journal,
})
}
@ -125,12 +132,19 @@ impl BucketRegistry {
pub fn rescue_name(&self) -> Option<&str> { self.rescue.as_deref() }
pub fn profile_root(&self) -> &str { &self.profile_root }
pub fn journal(&self) -> &ErrorJournal { &self.journal }
/// Resolve a bucket name to its object store. Existing call sites use
/// this as a drop-in replacement for the old single-store pattern.
///
/// Uses `std::sync::RwLock` — every caller holds the guard for
/// microseconds (clones an Arc and drops), so there's no async-
/// blocking concern. Never hold across an await.
pub fn get(&self, bucket: &str) -> Result<Arc<dyn ObjectStore>, String> {
self.buckets
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard
.get(bucket)
.map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))
@ -139,22 +153,34 @@ impl BucketRegistry {
/// The default bucket's store — use for code paths that don't yet know
/// about buckets.
pub fn default_store(&self) -> Arc<dyn ObjectStore> {
self.buckets.get(&self.default).unwrap().store.clone()
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(&self.default).unwrap().store.clone()
}
/// True if this bucket name is registered.
pub fn contains(&self, bucket: &str) -> bool {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.contains_key(bucket)
}
/// List all registered buckets. Checks reachability by doing a trivial
/// `list` with limit 1 on each.
pub async fn list(&self) -> Vec<BucketInfo> {
let mut out = Vec::with_capacity(self.buckets.len());
for (name, entry) in &self.buckets {
let reachable = probe(&entry.store).await;
let role = self.classify(name);
out.push(BucketInfo {
name: name.clone(),
backend: entry.backend.clone(),
reachable,
role,
});
// Snapshot (name, backend, store_clone) under the lock, then probe
// outside — probing can be slow (network), don't hold the lock.
let snapshot: Vec<(String, String, Arc<dyn ObjectStore>)> = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard
.iter()
.map(|(n, e)| (n.clone(), e.backend.clone(), e.store.clone()))
.collect()
};
let mut out = Vec::with_capacity(snapshot.len());
for (name, backend, store) in snapshot {
let reachable = probe(&store).await;
let role = self.classify(&name);
out.push(BucketInfo { name, backend, reachable, role });
}
out.sort_by(|a, b| a.name.cmp(&b.name));
out
@ -167,14 +193,69 @@ impl BucketRegistry {
else { BucketRole::Tenant }
}
/// Provision + register a bucket at runtime (federation layer 2).
/// Returns Err if the name is already registered. Local backends get
/// their root directory created automatically.
pub async fn add_bucket(&self, bc: BucketConfig) -> Result<BucketInfo, String> {
if self.contains(&bc.name) {
return Err(format!("bucket '{}' already registered", bc.name));
}
let store = build_store(&bc, self.secrets.as_ref()).await?;
let reachable = probe(&store).await;
let entry = Arc::new(BucketEntry {
name: bc.name.clone(),
backend: bc.backend.clone(),
store,
config: bc.clone(),
});
let role = self.classify(&bc.name);
let info = BucketInfo {
name: bc.name.clone(),
backend: bc.backend.clone(),
reachable,
role,
};
{
let mut guard = self.buckets.write().expect("bucket registry lock poisoned");
guard.insert(bc.name.clone(), entry);
}
tracing::info!("registered bucket '{}' backend={} reachable={}",
bc.name, bc.backend, reachable);
Ok(info)
}
/// Unregister a bucket. Refuses to remove `primary` or the configured
/// rescue bucket — those are load-bearing. Caller is responsible for
/// any final flush of bucket-local state before calling this.
pub fn remove_bucket(&self, name: &str) -> Result<(), String> {
if name == self.default {
return Err("cannot remove primary bucket".into());
}
if Some(name) == self.rescue.as_deref() {
return Err(format!("cannot remove rescue bucket '{name}'"));
}
let removed = {
let mut guard = self.buckets.write().expect("bucket registry lock poisoned");
guard.remove(name)
};
if removed.is_none() {
return Err(format!("bucket '{name}' not registered"));
}
tracing::info!("unregistered bucket '{}'", name);
Ok(())
}
/// Read with rescue-bucket fallback. If the target bucket fails and a
/// rescue is configured, retries against rescue. Records every failure
/// in the error journal.
pub async fn read_smart(&self, bucket: &str, key: &str) -> Result<ReadOutcome, String> {
let target = self.buckets.get(bucket)
.ok_or_else(|| format!("unknown bucket: {bucket}"))?;
let target_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(bucket).map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))?
};
match crate::ops::get(&target.store, key).await {
match crate::ops::get(&target_store, key).await {
Ok(data) => Ok(ReadOutcome {
data, rescued: false,
original_bucket: bucket.to_string(),
@ -187,8 +268,12 @@ impl BucketRegistry {
// Try rescue, if any.
if let Some(rescue_name) = &self.rescue {
if rescue_name != bucket {
if let Some(rescue) = self.buckets.get(rescue_name) {
match crate::ops::get(&rescue.store, key).await {
let rescue_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(rescue_name).map(|e| e.store.clone())
};
if let Some(rescue_store) = rescue_store {
match crate::ops::get(&rescue_store, key).await {
Ok(data) => {
self.journal.mark_rescued_last(bucket, key).await;
return Ok(ReadOutcome {
@ -219,9 +304,12 @@ impl BucketRegistry {
key: &str,
data: bytes::Bytes,
) -> Result<(), String> {
let target = self.buckets.get(bucket)
.ok_or_else(|| format!("unknown bucket: {bucket}"))?;
match crate::ops::put(&target.store, key, data).await {
let target_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(bucket).map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))?
};
match crate::ops::put(&target_store, key, data).await {
Ok(()) => Ok(()),
Err(err) => {
self.journal.append(BucketErrorEvent::new_write(bucket, key, &err)).await;

View File

@ -0,0 +1,31 @@
[package]
name = "vectord-lance"
version = "0.1.0"
edition = "2024"
# Firewall crate: the Lance stack (Arrow 57, DataFusion 52) is isolated
# from the rest of vectord (Arrow 55, DataFusion 47). Public API uses
# only std types — no Arrow types cross the crate boundary — so no
# version conflict propagates outward.
#
# See docs/ADR-019-vector-storage.md for the rationale: "productionizing
# will need either workspace-wide upgrade or a firewall via a dedicated
# vectord-lance crate." This is that firewall.
[dependencies]
lance = { version = "4.0", default-features = false }
lance-index = { version = "4.0", default-features = false }
lance-linalg = { version = "4.0", default-features = false }
# These Arrow/Parquet versions MUST match Lance's expectations — Lance
# re-exports their types through its API so any mismatch is a hard
# compile error. Keep in sync with lance-bench.
arrow = "57"
arrow-array = "57"
arrow-schema = "57"
parquet = "57"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
bytes = "1"

View File

@ -0,0 +1,528 @@
//! Production Lance vector backend (ADR-019 — hybrid architecture).
//!
//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52
//! / Lance 4 dependency tree. The public API uses only std types
//! (`Vec<f32>`, `Vec<String>`, `String`, `bool`) so nothing Arrow-shaped
//! crosses the crate boundary. That keeps vectord (Arrow 55) from
//! picking up an incompatible dep.
//!
//! Responsibilities:
//! - Migrate an existing binary-blob Parquet vector file into a Lance
//! dataset with `FixedSizeList<Float32, dims>`. One-time cost.
//! - Append new rows natively (no full-file rewrite — Lance's structural win).
//! - Build an IVF_PQ ANN index on the vector column.
//! - Vector search (`search`) using the IVF_PQ index when present,
//! falling back to full scan otherwise.
//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1)
//! lookup that Parquet-on-object-store can't cheaply do.
//! - Cheap count + stats introspection.
use arrow_array::{
Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array,
RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use futures::StreamExt;
use serde::Serialize;
use std::sync::Arc;
use std::time::Instant;
// ================= Public types =================
/// One search result. Mirrors vectord's existing `SearchResult` shape
/// structurally but carries simpler types so this crate stays firewalled.
#[derive(Debug, Clone, Serialize)]
pub struct Hit {
pub doc_id: String,
pub chunk_text: String,
pub score: f32,
/// Optional — set by search_with_vector, not by index-only search.
pub distance: Option<f32>,
}
/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector
/// so callers can do downstream work (rerank, cite, etc.) without a
/// second round trip.
#[derive(Debug, Clone, Serialize)]
pub struct Row {
pub doc_id: String,
pub chunk_text: String,
pub vector: Vec<f32>,
pub source: Option<String>,
pub chunk_idx: Option<i32>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MigrationStats {
pub rows_written: usize,
pub dimensions: usize,
pub disk_bytes: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct AppendStats {
pub rows_appended: usize,
pub disk_bytes_added: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct IndexStats {
pub name: String,
pub num_partitions: u32,
pub num_bits: u32,
pub num_sub_vectors: u32,
pub build_time_secs: f32,
pub disk_bytes_added: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DatasetStats {
pub path: String,
pub rows: usize,
pub disk_bytes: u64,
pub has_vector_index: bool,
}
// ================= The backend =================
/// Thin wrapper around a Lance dataset path. Lance handles the heavy
/// lifting — we just expose a narrow API.
#[derive(Clone)]
pub struct LanceVectorStore {
/// Local filesystem path or object-store URI (file:///..., s3://...).
/// Lance's internal URI parsing handles both.
path: String,
}
impl LanceVectorStore {
pub fn new(path: impl Into<String>) -> Self {
Self { path: path.into() }
}
pub fn path(&self) -> &str { &self.path }
/// Row count via Lance's fast metadata path (no scan).
pub async fn count(&self) -> Result<usize, String> {
let dataset = open_or_err(&self.path).await?;
let n = dataset.count_rows(None).await.map_err(e)?;
Ok(n)
}
/// True if the on-disk dataset exists AND has at least one `vector`
/// column index attached.
pub async fn has_vector_index(&self) -> Result<bool, String> {
use lance_index::DatasetIndexExt;
let dataset = match lance::dataset::Dataset::open(&self.path).await {
Ok(d) => d,
Err(_) => return Ok(false),
};
let indexes = dataset.load_indices().await.map_err(e)?;
Ok(indexes.iter().any(|ix| {
ix.fields.iter().any(|fid| {
dataset.schema().field_by_id(*fid)
.map(|f| f.name == "vector")
.unwrap_or(false)
})
}))
}
pub async fn stats(&self) -> Result<DatasetStats, String> {
let rows = self.count().await.unwrap_or(0);
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let has_vector_index = self.has_vector_index().await.unwrap_or(false);
Ok(DatasetStats {
path: self.path.clone(),
rows,
disk_bytes,
has_vector_index,
})
}
/// Migrate a vectord-format Parquet file into a Lance dataset.
///
/// Input schema (vectord's binary-blob format):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : Binary (raw f32 little-endian bytes)
///
/// Output schema (Lance-friendly):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : FixedSizeList<Float32, dims>
///
/// Idempotent at the file level — if the target exists, it's
/// overwritten. Caller must manage destination paths.
pub async fn migrate_from_parquet_bytes(
&self,
parquet_bytes: &[u8],
) -> Result<MigrationStats, String> {
let t0 = Instant::now();
let (schema, batches, rows) = read_parquet(parquet_bytes)?;
let dims = detect_vector_dims(&batches)?;
let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
// Overwrite any prior dataset at this path.
let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path));
write_dataset(&self.path, new_schema, new_batches).await?;
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
Ok(MigrationStats {
rows_written: rows,
dimensions: dims,
disk_bytes,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Native Lance append — does NOT rewrite existing files. New rows
/// land as a separate fragment; readers union across fragments at
/// query time. Contrast: our Parquet path requires rewriting the
/// entire vector file to add rows.
pub async fn append(
&self,
source: Option<String>,
doc_ids: Vec<String>,
chunk_idxs: Vec<i32>,
chunk_texts: Vec<String>,
vectors: Vec<Vec<f32>>,
) -> Result<AppendStats, String> {
let n = doc_ids.len();
if n == 0 {
return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 });
}
if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n {
return Err(format!(
"append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})",
chunk_idxs.len(), chunk_texts.len(), vectors.len(),
));
}
let dims = vectors[0].len();
for (i, v) in vectors.iter().enumerate() {
if v.len() != dims {
return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims));
}
}
let t0 = Instant::now();
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let src_arr = StringArray::from(
(0..n).map(|_| source.clone()).collect::<Vec<_>>()
);
let doc_id_arr = StringArray::from(doc_ids);
let chunk_idx_arr = Int32Array::from(chunk_idxs);
let chunk_text_arr = StringArray::from(chunk_texts);
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for v in vectors { flat.extend(v); }
let values = Float32Array::from(flat);
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
let vec_arr = FixedSizeListArray::try_new(
item_field.clone(), dims as i32, Arc::new(values), None,
).map_err(e)?;
let schema = Arc::new(Schema::new(vec![
Field::new("source", DataType::Utf8, true),
Field::new("doc_id", DataType::Utf8, false),
Field::new("chunk_idx", DataType::Int32, true),
Field::new("chunk_text", DataType::Utf8, true),
Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false),
]));
let arrays: Vec<ArrayRef> = vec![
Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr),
Arc::new(chunk_text_arr), Arc::new(vec_arr),
];
let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema);
use lance::dataset::{Dataset, WriteMode, WriteParams};
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?;
Ok(AppendStats {
rows_appended: n,
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Build an IVF_PQ vector index. Replaces any prior index with the
/// same name. Callers pass explicit params — sensible defaults for
/// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48.
pub async fn build_index(
&self,
num_partitions: u32,
num_bits: u32,
num_sub_vectors: u32,
) -> Result<IndexStats, String> {
use lance::dataset::Dataset;
use lance::index::vector::VectorIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let t0 = Instant::now();
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
let params = VectorIndexParams::ivf_pq(
num_partitions as usize,
num_bits as u8,
num_sub_vectors as usize,
MetricType::Cosine,
50, // max_iterations — same as bench
);
dataset.create_index(
&["vector"],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true, // replace
).await.map_err(e)?;
Ok(IndexStats {
name: "vec_idx".into(),
num_partitions,
num_bits,
num_sub_vectors,
build_time_secs: t0.elapsed().as_secs_f32(),
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
})
}
/// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ
/// index if one exists; otherwise does a full scan (slow but
/// correct — useful during development before index build).
pub async fn search(&self, query: &[f32], top_k: usize) -> Result<Vec<Hit>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let qarr = Float32Array::from(query.to_vec());
let mut scanner = dataset.scan();
scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?;
scanner.project(&["doc_id", "chunk_text"]).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
let mut hits: Vec<Hit> = Vec::with_capacity(top_k);
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
let doc_ids = batch.column_by_name("doc_id")
.ok_or_else(|| "no doc_id column in search result".to_string())?
.as_any().downcast_ref::<StringArray>()
.ok_or_else(|| "doc_id is not StringArray".to_string())?;
let chunk_texts = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
// Lance tacks on a `_distance` column for nearest() queries.
let distances = batch.column_by_name("_distance")
.and_then(|c| c.as_any().downcast_ref::<Float32Array>());
for row in 0..batch.num_rows() {
let d = distances.map(|a| a.value(row));
hits.push(Hit {
doc_id: doc_ids.value(row).to_string(),
chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(),
score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity
distance: d,
});
}
}
Ok(hits)
}
/// Fetch one row by doc_id. Implementation: Lance filter-pushdown
/// scan — O(1) with partition pruning on a proper btree index,
/// O(N) on vector-only datasets (still far faster than reading
/// the whole Parquet file). We don't build a scalar index on
/// doc_id yet; that's a future optimization.
pub async fn get_by_doc_id(&self, doc_id: &str) -> Result<Option<Row>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''"));
let mut scanner = dataset.scan();
scanner.filter(&filter).map_err(e)?;
scanner.limit(Some(1), None).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
if batch.num_rows() == 0 { continue; }
return Ok(Some(row_from_batch(&batch, 0)?));
}
Ok(None)
}
}
// ================= Internal helpers =================
fn e<T: std::fmt::Display>(err: T) -> String { err.to_string() }
/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for
/// helpers that only work on local paths (dir_size_bytes, remove_dir_all).
fn strip_file_uri(uri: &str) -> String {
uri.strip_prefix("file://").unwrap_or(uri).to_string()
}
async fn open_or_err(path: &str) -> Result<lance::dataset::Dataset, String> {
lance::dataset::Dataset::open(path).await.map_err(e)
}
fn dir_size_bytes(path: &str) -> u64 {
fn recurse(p: &std::path::Path) -> u64 {
let Ok(meta) = std::fs::metadata(p) else { return 0; };
if meta.is_file() { return meta.len(); }
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum()
}
recurse(std::path::Path::new(path))
}
fn read_parquet(bytes: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize), String> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes))
.map_err(e)?;
let schema = builder.schema().clone();
let reader = builder.build().map_err(e)?;
let batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().map_err(e)?;
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
Ok((schema, batches, rows))
}
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize, String> {
for batch in batches {
let idx = batch.schema().index_of("vector")
.map_err(|_| "no 'vector' column".to_string())?;
let col = batch.column(idx);
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
for i in 0..binary.len() {
if !binary.is_null(i) {
return Ok(binary.value(i).len() / 4);
}
}
} else if let Some(fsl) = col.as_any().downcast_ref::<FixedSizeListArray>() {
return Ok(fsl.value_length() as usize);
}
}
Err("could not determine vector dimensions".into())
}
fn convert_to_fixed_size_list(
schema: &Arc<Schema>,
batches: Vec<RecordBatch>,
dims: usize,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let new_fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.map(|f| {
if f.name() == "vector" {
Arc::new(Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dims as i32,
),
false,
))
} else {
f.clone()
}
})
.collect();
let new_schema = Arc::new(Schema::new(new_fields));
let mut out = Vec::with_capacity(batches.len());
for batch in batches {
let vec_idx = batch.schema().index_of("vector").map_err(e)?;
let mut new_cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
for (i, col) in batch.columns().iter().enumerate() {
if i == vec_idx {
if let Some(bin) = col.as_any().downcast_ref::<BinaryArray>() {
new_cols.push(Arc::new(binary_to_fsl(bin, dims)?));
} else if col.as_any().is::<FixedSizeListArray>() {
// Already in the right shape — just clone.
new_cols.push(col.clone());
} else {
return Err("vector column is neither Binary nor FixedSizeList".into());
}
} else {
new_cols.push(col.clone());
}
}
out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?);
}
Ok((new_schema, out))
}
fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result<FixedSizeListArray, String> {
let n = bin.len();
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for i in 0..n {
if bin.is_null(i) {
flat.extend(std::iter::repeat(0.0).take(dims));
continue;
}
let b = bin.value(i);
if b.len() != dims * 4 {
return Err(format!(
"row {i}: {} bytes vs expected {} ({} × f32)",
b.len(), dims * 4, dims,
));
}
for c in b.chunks_exact(4) {
flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]]));
}
}
let values = Float32Array::from(flat);
let field = Arc::new(Field::new("item", DataType::Float32, true));
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e)
}
async fn write_dataset(
path: &str,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Result<(), String> {
use lance::dataset::{Dataset, WriteParams};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Dataset::write(reader, path, Some(WriteParams::default()))
.await.map_err(e)?;
Ok(())
}
fn row_from_batch(batch: &RecordBatch, row: usize) -> Result<Row, String> {
let doc_id = batch.column_by_name("doc_id")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.ok_or_else(|| "missing doc_id".to_string())?;
let chunk_text = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.unwrap_or_default();
let source = batch.column_by_name("source")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) });
let chunk_idx = batch.column_by_name("chunk_idx")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) });
let vec_col = batch.column_by_name("vector")
.ok_or_else(|| "no vector column in row fetch".to_string())?;
let fsl = vec_col.as_any().downcast_ref::<FixedSizeListArray>()
.ok_or_else(|| "vector column is not FixedSizeList".to_string())?;
let inner = fsl.value(row);
let floats = inner.as_any().downcast_ref::<Float32Array>()
.ok_or_else(|| "vector inner not Float32".to_string())?;
let mut v = Vec::with_capacity(floats.len());
for i in 0..floats.len() { v.push(floats.value(i)); }
Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx })
}

View File

@ -8,6 +8,9 @@ shared = { path = "../shared" }
storaged = { path = "../storaged" }
aibridge = { path = "../aibridge" }
catalogd = { path = "../catalogd" }
# ADR-019 firewall — vectord-lance owns its own Arrow 57 / Lance 4 deps.
# Public API uses only std types so no version conflict propagates here.
vectord-lance = { path = "../vectord-lance" }
tokio = { workspace = true }
axum = { workspace = true }
serde = { workspace = true }

711
crates/vectord/src/agent.rs Normal file
View File

@ -0,0 +1,711 @@
//! Phase 16.2 + 16.5 — The autotune agent.
//!
//! A long-running tokio task that watches the trial journal and
//! autonomously proposes + runs new HNSW configs. Distinct from
//! `autotune::run_autotune` which is synchronous (one HTTP call, grid
//! of trials, done). The agent is the continuous version: it sleeps,
//! wakes on triggers, proposes configs based on prior trial history,
//! runs them one at a time, and auto-promotes when it finds an
//! improvement.
//!
//! Design invariants:
//! - Trials are data (ADR-018). Every proposal reads the journal; every
//! attempt appends to it. The journal is the agent's memory.
//! - One trial at a time. Bounded Ollama load — the agent never fires
//! multiple parallel embeddings and respects `cooldown_between_trials_secs`.
//! - Rate-limited. `max_trials_per_hour` is a hard ceiling so a
//! misbehaving proposal function can't saturate the system.
//! - Never promotes below `min_recall`. Same safety gate as
//! `run_autotune` — we will not make the index worse.
//! - Triggered OR periodic. Ingest enqueues a `DatasetAppended` event
//! when a new batch lands; the agent also wakes periodically to keep
//! exploring even when nothing changed externally.
//! - Graceful shutdown via the `stop_tx` signal — the handle's Drop
//! doesn't force-kill, but `stop()` requests a clean exit after the
//! current trial.
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
use aibridge::client::AiClient;
use catalogd::registry::Registry as CatalogRegistry;
use crate::embedding_cache::EmbeddingCache;
use crate::harness;
use crate::hnsw::HnswStore;
use crate::index_registry::IndexRegistry;
use crate::promotion::{PromotionEntry, PromotionRegistry};
use crate::trial::{HnswConfig, Trial, TrialJournal, TrialMetrics};
// -------- Public-facing types --------
/// Runtime configuration for the agent. Mirrored in shared::config under
/// `[agent]`. Defaults are conservative — designed to tune slowly in the
/// background without fighting real workloads for GPU time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentConfig {
/// Master switch. When false, `spawn` returns a handle but the loop
/// doesn't run.
pub enabled: bool,
/// Periodic wake-up — even if the trigger queue is empty, every N
/// seconds the agent picks an index with trials and proposes one
/// more config. Keeps exploration alive on idle indexes.
pub cycle_interval_secs: u64,
/// Minimum gap between two trials on the SAME index. Prevents the
/// agent from hammering Ollama when a hot index has many pending
/// triggers in a row.
pub cooldown_between_trials_secs: u64,
/// Below this recall, a proposal is never promoted — even if it
/// beats the champion on latency.
pub min_recall: f32,
/// Budget cap: hard ceiling on trials per hour across all indexes.
/// When hit, the agent idles until the hour window rolls.
pub max_trials_per_hour: u32,
}
impl Default for AgentConfig {
fn default() -> Self {
Self {
enabled: false, // opt-in — don't auto-start until J turns it on
cycle_interval_secs: 60,
cooldown_between_trials_secs: 30,
min_recall: 0.9,
max_trials_per_hour: 30,
}
}
}
/// What caused the agent to look at a particular index. Recorded in the
/// trial's note field so we can tell "new data arrived" trials from
/// "periodic exploration" trials in the journal.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TriggerReason {
/// Ingest just appended to a dataset that has attached HNSW indexes.
DatasetAppended { dataset: String },
/// A human or another agent hit `/agent/enqueue/{index}`.
Manual,
/// Periodic wake — no external event, just keep exploring.
Periodic,
}
/// One unit of work for the agent.
#[derive(Debug, Clone)]
pub struct TriggerEvent {
pub index_name: String,
pub reason: TriggerReason,
pub enqueued_at: DateTime<Utc>,
}
impl TriggerEvent {
pub fn manual(index_name: impl Into<String>) -> Self {
Self { index_name: index_name.into(), reason: TriggerReason::Manual, enqueued_at: Utc::now() }
}
pub fn dataset_appended(index_name: impl Into<String>, dataset: impl Into<String>) -> Self {
Self {
index_name: index_name.into(),
reason: TriggerReason::DatasetAppended { dataset: dataset.into() },
enqueued_at: Utc::now(),
}
}
pub fn periodic(index_name: impl Into<String>) -> Self {
Self { index_name: index_name.into(), reason: TriggerReason::Periodic, enqueued_at: Utc::now() }
}
}
/// Observable snapshot of the agent's state — what `/agent/status` returns.
#[derive(Debug, Clone, Serialize)]
pub struct AgentStatus {
pub running: bool,
pub config: AgentConfig,
pub queue_depth: usize,
pub trials_run: u64,
pub promotions: u64,
pub trials_in_last_hour: u32,
pub last_event: Option<AgentEvent>,
pub started_at: Option<DateTime<Utc>>,
}
/// Last thing that happened — useful for "why didn't it do anything?" debugging.
#[derive(Debug, Clone, Serialize)]
pub struct AgentEvent {
pub at: DateTime<Utc>,
pub kind: String, // "trial_completed" | "promoted" | "skipped_rate_limit" | etc
pub index_name: Option<String>,
pub detail: String,
}
/// Handle returned by `spawn`. Holds the trigger sender + shared status +
/// stop signal.
#[derive(Clone)]
pub struct AgentHandle {
trigger_tx: mpsc::Sender<TriggerEvent>,
inner: Arc<AgentInner>,
}
struct AgentInner {
status: RwLock<AgentStatus>,
stop_tx: Mutex<Option<oneshot::Sender<()>>>,
queue_len: Mutex<usize>, // mirror of the channel capacity — for status reporting
recent_trials: Mutex<VecDeque<DateTime<Utc>>>, // ring of recent trial timestamps for rate limit
}
impl AgentHandle {
/// Enqueue a trigger. Returns Err if the agent isn't running or the
/// queue is full (backpressure — dropping events is correct here
/// since periodic exploration will pick up the slack).
pub async fn enqueue(&self, event: TriggerEvent) -> Result<(), String> {
self.trigger_tx.try_send(event).map_err(|e| format!("enqueue: {e}"))?;
let mut guard = self.inner.queue_len.lock().await;
*guard = guard.saturating_add(1);
// Update queue_depth in status for observability.
let mut s = self.inner.status.write().await;
s.queue_depth = *guard;
Ok(())
}
pub async fn status(&self) -> AgentStatus {
let mut s = self.inner.status.read().await.clone();
// Refresh rate-limit window from ring buffer.
let cutoff = Utc::now() - chrono::Duration::hours(1);
let ring = self.inner.recent_trials.lock().await;
s.trials_in_last_hour = ring.iter().filter(|t| **t >= cutoff).count() as u32;
s
}
/// Request a graceful stop. Returns immediately — the loop exits
/// after its current trial completes.
pub async fn stop(&self) -> bool {
let mut guard = self.inner.stop_tx.lock().await;
if let Some(tx) = guard.take() {
let _ = tx.send(());
true
} else {
false
}
}
}
// -------- Agent state holder --------
/// Everything the agent needs to run a trial. Mirrors the fields of
/// `VectorState` the agent actually uses. Kept separate so the service
/// layer builds it explicitly — no clone of unneeded state.
#[derive(Clone)]
pub struct AgentDeps {
pub store: Arc<dyn ObjectStore>,
pub ai_client: AiClient,
pub catalog: CatalogRegistry,
pub index_registry: IndexRegistry,
pub hnsw_store: HnswStore,
pub embedding_cache: EmbeddingCache,
pub trial_journal: TrialJournal,
pub promotion_registry: PromotionRegistry,
}
// -------- Spawn --------
/// Start the agent loop in a background tokio task. Returns a handle
/// the caller uses to enqueue events and read status.
pub fn spawn(config: AgentConfig, deps: AgentDeps) -> AgentHandle {
let (trigger_tx, trigger_rx) = mpsc::channel::<TriggerEvent>(256);
let (stop_tx, stop_rx) = oneshot::channel::<()>();
let status = AgentStatus {
running: config.enabled,
config: config.clone(),
queue_depth: 0,
trials_run: 0,
promotions: 0,
trials_in_last_hour: 0,
last_event: None,
started_at: if config.enabled { Some(Utc::now()) } else { None },
};
let inner = Arc::new(AgentInner {
status: RwLock::new(status),
stop_tx: Mutex::new(Some(stop_tx)),
queue_len: Mutex::new(0),
recent_trials: Mutex::new(VecDeque::with_capacity(64)),
});
if config.enabled {
tracing::info!(
"autotune agent started (cycle={}s, cooldown={}s, cap={}/hr, min_recall={})",
config.cycle_interval_secs, config.cooldown_between_trials_secs,
config.max_trials_per_hour, config.min_recall,
);
let loop_inner = inner.clone();
let loop_deps = deps.clone();
let loop_config = config.clone();
tokio::spawn(async move {
run_loop(loop_config, loop_deps, trigger_rx, stop_rx, loop_inner).await;
});
} else {
// Agent disabled — still drain the channel so sends don't back up.
tokio::spawn(async move {
let mut rx = trigger_rx;
while rx.recv().await.is_some() {}
});
tracing::info!("autotune agent configured but disabled (set [agent].enabled=true)");
}
AgentHandle { trigger_tx, inner }
}
// -------- Main loop --------
async fn run_loop(
config: AgentConfig,
deps: AgentDeps,
mut trigger_rx: mpsc::Receiver<TriggerEvent>,
mut stop_rx: oneshot::Receiver<()>,
inner: Arc<AgentInner>,
) {
let mut periodic = tokio::time::interval(std::time::Duration::from_secs(config.cycle_interval_secs));
// First tick fires immediately — skip it so we don't double-fire on startup.
periodic.tick().await;
loop {
let event = tokio::select! {
_ = &mut stop_rx => {
tracing::info!("autotune agent: stop signal received");
let mut s = inner.status.write().await;
s.running = false;
return;
}
maybe = trigger_rx.recv() => match maybe {
Some(ev) => {
let mut guard = inner.queue_len.lock().await;
*guard = guard.saturating_sub(1);
let mut s = inner.status.write().await;
s.queue_depth = *guard;
ev
}
None => {
tracing::info!("autotune agent: trigger channel closed");
return;
}
},
_ = periodic.tick() => {
// Periodic wake — pick an index with existing trials.
// If nothing's been tuned yet there's nothing to propose.
match pick_periodic_target(&deps).await {
Some(idx) => TriggerEvent::periodic(idx),
None => continue,
}
}
};
// Rate limit: count recent trials, skip if over budget.
if over_rate_limit(&inner, config.max_trials_per_hour).await {
record_event(&inner, "skipped_rate_limit", Some(&event.index_name),
format!("hit cap of {}/hour", config.max_trials_per_hour)).await;
continue;
}
// Per-index cooldown.
if cooling_down(&inner, &event.index_name, config.cooldown_between_trials_secs).await {
record_event(&inner, "skipped_cooldown", Some(&event.index_name),
format!("last trial too recent (<{}s)", config.cooldown_between_trials_secs)).await;
continue;
}
// Run one trial.
match run_one_cycle(&event, &deps, config.min_recall).await {
Ok(outcome) => {
mark_recent_trial(&inner).await;
{
let mut s = inner.status.write().await;
s.trials_run += 1;
if outcome.promoted { s.promotions += 1; }
}
record_event(&inner, if outcome.promoted { "promoted" } else { "trial_completed" },
Some(&event.index_name),
format!("config=ec{}/es{} recall={:.3} p50={:.0}us {}",
outcome.trial.config.ef_construction,
outcome.trial.config.ef_search,
outcome.trial.metrics.recall_at_k,
outcome.trial.metrics.search_latency_p50_us,
if outcome.promoted { "★ PROMOTED" } else { "" })).await;
}
Err(e) => {
record_event(&inner, "trial_error", Some(&event.index_name), e).await;
}
}
}
}
/// Result of one cycle — ran a trial, maybe promoted it.
struct CycleOutcome {
trial: Trial,
promoted: bool,
}
/// Core cycle: propose → build → bench → record → maybe promote.
async fn run_one_cycle(
event: &TriggerEvent,
deps: &AgentDeps,
min_recall: f32,
) -> Result<CycleOutcome, String> {
// Read history.
let history = deps.trial_journal.list(&event.index_name).await
.map_err(|e| format!("read journal: {e}"))?;
if history.is_empty() {
return Err(format!(
"no trials yet for '{}' — seed with at least one POST /hnsw/trial first",
event.index_name,
));
}
// Current champion (if any) is the promoted config.
let champion = deps.promotion_registry.get_current(&event.index_name).await;
let champion_trial = champion.as_ref().and_then(|p| {
history.iter().find(|t| t.id == p.trial_id).cloned()
});
// Propose the next config.
let Some(next_config) = propose_next_config(&history, champion_trial.as_ref()) else {
return Err("proposer returned None — search space exhausted".into());
};
// Validate bounds defensively.
if !(10..=400).contains(&next_config.ef_construction) {
return Err(format!("proposed ef_construction={} out of bounds", next_config.ef_construction));
}
if !(10..=200).contains(&next_config.ef_search) {
return Err(format!("proposed ef_search={} out of bounds", next_config.ef_search));
}
// Need a harness to measure. Use the most recent one in history.
// (A future refinement: remember per-index "canonical harness" on
// the index metadata. For now: latest wins.)
let harness_name = history.last().unwrap().eval_set.clone();
let mut harness_set = harness::EvalSet::load(&deps.store, &harness_name).await
.map_err(|e| format!("load harness '{harness_name}': {e}"))?;
let embeddings = deps.embedding_cache.get_or_load(&event.index_name).await
.map_err(|e| format!("embeddings: {e}"))?;
if !harness_set.ground_truth_built {
harness::compute_ground_truth(&mut harness_set, &embeddings, &deps.ai_client).await
.map_err(|e| format!("ground truth: {e}"))?;
harness_set.save(&deps.store).await.ok();
}
// Build + bench.
let trial_id = Trial::new_id();
let slot = format!("{}__{}", event.index_name, trial_id);
let build = deps.hnsw_store
.build_index_with_config(&slot, (*embeddings).clone(), &next_config)
.await?;
let query_vectors: Vec<Vec<f32>> = harness_set.queries
.iter().filter_map(|q| q.query_embedding.clone()).collect();
let bench = deps.hnsw_store.bench_search(&slot, &query_vectors, harness_set.k).await?;
let mut recalls = Vec::with_capacity(harness_set.queries.len());
for (q, hits) in harness_set.queries.iter().zip(bench.retrieved.iter()) {
if let Some(gt) = &q.ground_truth {
recalls.push(harness::recall_at_k(hits, gt, harness_set.k));
}
}
let mean_recall = if recalls.is_empty() { 0.0 } else {
recalls.iter().sum::<f32>() / recalls.len() as f32
};
let mut lats = bench.latencies_us.clone();
lats.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p = |pct: f32| -> f32 {
if lats.is_empty() { return 0.0; }
let idx = ((lats.len() as f32 - 1.0) * pct).round() as usize;
lats[idx.min(lats.len() - 1)]
};
let dims = embeddings.first().map(|e| e.vector.len()).unwrap_or(0);
let memory_bytes = (embeddings.len() * dims * std::mem::size_of::<f32>() + embeddings.len() * 128) as u64;
let note = match &event.reason {
TriggerReason::DatasetAppended { dataset } => format!("agent: dataset_appended({dataset})"),
TriggerReason::Manual => "agent: manual".to_string(),
TriggerReason::Periodic => "agent: periodic".to_string(),
};
let trial = Trial {
id: trial_id,
index_name: event.index_name.clone(),
eval_set: harness_set.name.clone(),
config: next_config.clone(),
metrics: TrialMetrics {
build_time_secs: build.build_time_secs,
search_latency_p50_us: p(0.50),
search_latency_p95_us: p(0.95),
search_latency_p99_us: p(0.99),
recall_at_k: mean_recall,
memory_bytes,
vectors: build.vectors,
eval_queries: harness_set.queries.len(),
brute_force_latency_us: 0.0,
},
created_at: Utc::now(),
note: Some(note),
};
deps.trial_journal.append(&trial).await.ok();
deps.hnsw_store.drop(&slot).await;
// Promotion decision: the new trial must meet recall gate AND beat
// the current champion (higher recall OR same recall + lower p50).
let promoted = if trial.metrics.recall_at_k < min_recall {
false
} else {
let beats = match &champion_trial {
None => true, // no champion yet — anything passing the gate wins
Some(c) => beats_champion(&trial, c),
};
if beats {
let entry = PromotionEntry {
config: trial.config.clone(),
trial_id: trial.id.clone(),
promoted_at: Utc::now(),
promoted_by: "agent".to_string(),
note: Some(format!(
"auto-promote: recall={:.3} p50={:.0}us (was {:.3}/{:.0}us)",
trial.metrics.recall_at_k, trial.metrics.search_latency_p50_us,
champion_trial.as_ref().map(|t| t.metrics.recall_at_k).unwrap_or(0.0),
champion_trial.as_ref().map(|t| t.metrics.search_latency_p50_us).unwrap_or(0.0),
)),
};
deps.promotion_registry.promote(&event.index_name, entry).await.is_ok()
} else {
false
}
};
Ok(CycleOutcome { trial, promoted })
}
/// Champion-beat test: strictly higher recall, OR equal recall with
/// lower p50. Same rule as autotune::pick_winner — kept consistent so the
/// agent and the synchronous autotune agree on what "better" means.
fn beats_champion(candidate: &Trial, champion: &Trial) -> bool {
if candidate.metrics.recall_at_k > champion.metrics.recall_at_k {
return true;
}
if (candidate.metrics.recall_at_k - champion.metrics.recall_at_k).abs() < 1e-4
&& candidate.metrics.search_latency_p50_us < champion.metrics.search_latency_p50_us {
return true;
}
false
}
/// Propose the next HnswConfig given trial history and the current
/// champion.
///
/// ============================================================
/// J: THIS IS YOURS TO IMPLEMENT
/// ============================================================
///
/// Inputs:
/// - `history`: every trial ever run on this index, oldest first
/// - `champion`: the currently-promoted trial, if any
///
/// Output:
/// - `Some(HnswConfig)` with the config to try next
/// - `None` if you decide there's nothing worth trying (agent logs
/// "search space exhausted" and moves on)
///
/// Hard bounds the caller enforces: ef_construction ∈ [10, 400],
/// ef_search ∈ [10, 200]. Stay inside those — configs outside get
/// rejected and count as a wasted cycle.
///
/// Design options (pick one, or mix):
///
/// 1. LOCAL REFINEMENT (exploit-heavy):
/// Sample near champion ± small delta. Converges fast, risks local
/// minima. Good for "we know roughly where the sweet spot is."
///
/// 2. ε-GREEDY (mixed):
/// With prob ε, random sample from full bounds (explore). Otherwise
/// refinement around champion (exploit). ε=0.2 is a reasonable start.
/// Good for long-running tune with no prior knowledge.
///
/// 3. COARSE→FINE (annealed):
/// First N trials: wide random. Then shrink the neighborhood around
/// champion as more trials accumulate. Mimics simulated annealing.
///
/// 4. DEDUP-AWARE:
/// Whatever strategy, skip configs already in history. Prevents the
/// agent from re-running the same (ec, es) pair twice.
///
/// A simple starter implementation is provided below (local refinement
/// + dedup). Replace with your preferred strategy.
pub fn propose_next_config(history: &[Trial], champion: Option<&Trial>) -> Option<HnswConfig> {
// ε-greedy around the champion, dedup-aware.
//
// - With probability ε (≈0.25), sample a random config from the full
// bounds. Keeps exploration alive so we don't get stuck hill-climbing
// one axis.
// - Otherwise: perturb the champion symmetrically on BOTH axes (not
// just +20 / +40 like the starter did). Prefers small moves first
// so recall stays near the current level.
// - Always skip configs already in history — no point re-running.
// - Deterministic per-history: RNG is seeded from history length so
// the same journal state always proposes the same next config.
// Makes tests + offline replay reproducible.
let base = champion
.map(|t| t.config.clone())
.or_else(|| history.last().map(|t| t.config.clone()))
.unwrap_or_default();
let tried = |ec: usize, es: usize| -> bool {
history.iter().any(|t|
t.config.ef_construction == ec && t.config.ef_search == es
)
};
let clamp = |ec: i32, es: i32| -> (usize, usize) {
(ec.clamp(10, 400) as usize, es.clamp(10, 200) as usize)
};
// Tiny xorshift — no rand crate dep. Seeded from history length so the
// proposer is deterministic for a given journal state.
let mut rng = (history.len() as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 1;
let mut next_u = || {
rng ^= rng << 13; rng ^= rng >> 7; rng ^= rng << 17;
rng
};
for _attempt in 0..32 {
let explore = (next_u() % 100) < 25; // ε = 0.25
let (ec, es) = if explore {
clamp(10 + (next_u() % 391) as i32, 10 + (next_u() % 191) as i32)
} else {
// Symmetric perturbation — signed steps on both axes.
let dec = [-40, -20, -10, 10, 20, 40][(next_u() % 6) as usize];
let des = [-20, -10, -5, 5, 10, 20][(next_u() % 6) as usize];
clamp(base.ef_construction as i32 + dec, base.ef_search as i32 + des)
};
if !tried(ec, es) {
return Some(HnswConfig { ef_construction: ec, ef_search: es, seed: Some(42) });
}
}
None // 32 attempts all landed on duplicates — likely saturated
}
// -------- Helpers --------
/// Find an index to poke on a periodic wake. Strategy: the one with the
/// most recent trial. If nothing's been trialed yet, return None.
async fn pick_periodic_target(deps: &AgentDeps) -> Option<String> {
// `/agent` runs against any index that has a trial journal. We don't
// have a "list all journals" helper, so we derive candidates from the
// promotion registry (indexes with a human ever promoting are live).
let promos = deps.promotion_registry.list_all().await.ok()?;
// Prefer the one most recently promoted — it's the one a human cares
// about right now.
promos.into_iter()
.filter_map(|f| f.current.map(|c| (f.index_name, c.promoted_at)))
.max_by_key(|(_, at)| *at)
.map(|(name, _)| name)
}
async fn over_rate_limit(inner: &Arc<AgentInner>, cap: u32) -> bool {
let cutoff = Utc::now() - chrono::Duration::hours(1);
let ring = inner.recent_trials.lock().await;
ring.iter().filter(|t| **t >= cutoff).count() as u32 >= cap
}
async fn cooling_down(inner: &Arc<AgentInner>, _index: &str, cooldown_secs: u64) -> bool {
// Minimal impl: gate on global most-recent trial rather than per-index.
// Per-index cooldown would be easy to add — keep a HashMap<String, DateTime>
// — but for Phase 16.2 MVP, global is fine. Ollama is the shared resource.
let ring = inner.recent_trials.lock().await;
if let Some(last) = ring.back() {
let since = Utc::now().signed_duration_since(*last);
return since < chrono::Duration::seconds(cooldown_secs as i64);
}
false
}
async fn mark_recent_trial(inner: &Arc<AgentInner>) {
let mut ring = inner.recent_trials.lock().await;
ring.push_back(Utc::now());
// Keep bounded.
while ring.len() > 256 {
ring.pop_front();
}
}
async fn record_event(
inner: &Arc<AgentInner>,
kind: &str,
index: Option<&str>,
detail: String,
) {
tracing::info!("agent: {} {}{}", kind,
index.map(|i| format!("[{i}] ")).unwrap_or_default(), detail);
let mut s = inner.status.write().await;
s.last_event = Some(AgentEvent {
at: Utc::now(),
kind: kind.to_string(),
index_name: index.map(String::from),
detail,
});
}
#[cfg(test)]
mod tests {
use super::*;
fn mk_trial(ec: usize, es: usize, recall: f32, p50: f32) -> Trial {
Trial {
id: format!("t-{ec}-{es}"),
index_name: "test".into(),
eval_set: "eval".into(),
config: HnswConfig { ef_construction: ec, ef_search: es, seed: Some(42) },
metrics: TrialMetrics {
build_time_secs: 1.0,
search_latency_p50_us: p50,
search_latency_p95_us: p50 * 1.5,
search_latency_p99_us: p50 * 2.0,
recall_at_k: recall,
memory_bytes: 0, vectors: 1000, eval_queries: 10,
brute_force_latency_us: 100.0,
},
created_at: Utc::now(),
note: None,
}
}
#[test]
fn propose_skips_duplicates() {
let hist = vec![
mk_trial(80, 30, 1.0, 500.0),
mk_trial(100, 30, 1.0, 520.0), // ec+20
];
let next = propose_next_config(&hist, Some(&hist[0])).unwrap();
// ec+20 is taken, so the proposer should skip it.
assert!(next.ef_construction != 100 || next.ef_search != 30);
}
#[test]
fn beats_champion_strict_recall() {
let champ = mk_trial(80, 30, 0.95, 500.0);
let better_recall = mk_trial(80, 30, 0.99, 600.0);
let worse_recall = mk_trial(80, 30, 0.90, 100.0);
assert!(beats_champion(&better_recall, &champ));
assert!(!beats_champion(&worse_recall, &champ));
}
#[test]
fn beats_champion_same_recall_lower_latency() {
let champ = mk_trial(80, 30, 1.0, 500.0);
let faster = mk_trial(60, 30, 1.0, 400.0);
let slower = mk_trial(60, 30, 1.0, 600.0);
assert!(beats_champion(&faster, &champ));
assert!(!beats_champion(&slower, &champ));
}
}

View File

@ -29,8 +29,21 @@ pub struct IndexMeta {
pub created_at: DateTime<Utc>,
pub build_time_secs: f32,
pub chunks_per_sec: f32,
/// Federation layer 2: which bucket holds this index's artifacts
/// (trial journal + promotion file). Defaults to "primary" for
/// pre-federation indexes — the serde default keeps old metadata
/// files readable without migration.
#[serde(default = "default_bucket")]
pub bucket: String,
/// ADR-019: which physical backend stores this index. `Parquet`
/// means storage_key points at our binary-blob Parquet file;
/// `Lance` means it points at a Lance dataset directory.
#[serde(default)]
pub vector_backend: shared::types::VectorBackend,
}
fn default_bucket() -> String { "primary".to_string() }
/// Registry of all vector indexes.
#[derive(Clone)]
pub struct IndexRegistry {

View File

@ -0,0 +1,112 @@
//! ADR-019 hybrid: routing layer for the Lance vector backend.
//!
//! Holds a `LanceRegistry` which maps an index name to its
//! `LanceVectorStore` instance, lazy-creating on first touch. Path
//! resolution: the index's bucket gives us a bucket root; we append
//! `lance/{index_name}` and use that as the dataset URI. For local
//! buckets that means a directory under the bucket's root.
//!
//! S3 buckets: in principle Lance accepts s3://... URIs, but the
//! firewall crate (`vectord-lance`) doesn't pull the S3 feature in by
//! default. When we promote profile buckets onto S3 in a future phase,
//! enable that feature and update `lance_uri_for` accordingly.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::registry::BucketRegistry;
use vectord_lance::LanceVectorStore;
use crate::index_registry::IndexRegistry;
/// Convert a bucket+index pair into the URI Lance should use as the
/// dataset path. Local-only for MVP; S3 when we wire that backend.
///
/// Path resolution mirrors lakehouse.toml's convention for local
/// buckets: ./data for primary, ./data/_rescue for rescue, ./data/_testing
/// for testing, ./data/_profiles/{sanitized} for profile:* buckets, and
/// ./data/_buckets/{sanitized} for everything else. Sanitization replaces
/// `:` with `_` so paths are filesystem-safe.
///
/// Refuses unknown buckets so a typo doesn't silently land Lance data
/// in a directory the rest of the system can't see.
pub fn lance_uri_for(
buckets: &BucketRegistry,
bucket: &str,
index_name: &str,
) -> Result<String, String> {
if !buckets.contains(bucket) {
return Err(format!("bucket '{bucket}' not registered"));
}
let root: PathBuf = match bucket {
"primary" => PathBuf::from("./data"),
"rescue" => PathBuf::from("./data/_rescue"),
"testing" => PathBuf::from("./data/_testing"),
b if b.starts_with("profile:") => {
let safe = b.replace(':', "_");
PathBuf::from(format!("./data/_profiles/{safe}"))
}
b => PathBuf::from(format!("./data/_buckets/{}", b.replace(':', "_"))),
};
let dataset_dir = root.join("lance").join(index_name);
// Pre-create the parent so Lance's first write doesn't trip on a
// missing ancestor. Lance handles the dataset directory itself.
let _ = std::fs::create_dir_all(root.join("lance"));
// Canonicalize after the parent is guaranteed to exist; if the
// dataset dir hasn't been created yet, canonicalize the parent and
// tack on the leaf name.
let abs = match std::fs::canonicalize(&root) {
Ok(p) => p.join("lance").join(index_name),
Err(_) => dataset_dir.clone(),
};
Ok(abs.to_string_lossy().to_string())
}
/// Lookup-by-index registry of `LanceVectorStore` handles. One handle
/// per index, lazy-created on first call. Cheap to keep alive — it's
/// just a path string + handle to Lance's metadata cache.
#[derive(Clone)]
pub struct LanceRegistry {
buckets: Arc<BucketRegistry>,
indexes: IndexRegistry,
stores: Arc<RwLock<HashMap<String, LanceVectorStore>>>,
}
impl LanceRegistry {
pub fn new(buckets: Arc<BucketRegistry>, indexes: IndexRegistry) -> Self {
Self {
buckets,
indexes,
stores: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Get (or lazy-create) the LanceVectorStore for an index.
/// Resolves bucket → URI via the registry; backend stays whatever
/// IndexMeta says (no enforcement here — caller decides whether to
/// route to Lance based on profile/index backend choice).
pub async fn store_for(&self, index_name: &str) -> Result<LanceVectorStore, String> {
if let Some(s) = self.stores.read().await.get(index_name) {
return Ok(s.clone());
}
let bucket = self.indexes
.get(index_name).await
.map(|m| m.bucket)
.unwrap_or_else(|| "primary".to_string());
let uri = lance_uri_for(&self.buckets, &bucket, index_name)?;
let store = LanceVectorStore::new(uri);
self.stores.write().await.insert(index_name.to_string(), store.clone());
Ok(store)
}
/// For freshly-created indexes that don't have IndexMeta yet — caller
/// supplies the bucket explicitly.
pub async fn store_for_new(&self, index_name: &str, bucket: &str) -> Result<LanceVectorStore, String> {
let uri = lance_uri_for(&self.buckets, bucket, index_name)?;
let store = LanceVectorStore::new(uri);
self.stores.write().await.insert(index_name.to_string(), store.clone());
Ok(store)
}
}

View File

@ -1,4 +1,6 @@
pub mod agent;
pub mod autotune;
pub mod lance_backend;
pub mod chunker;
pub mod embedding_cache;
pub mod harness;

View File

@ -19,13 +19,14 @@
//! - Agent loop — lives in `vectord::autotune`.
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use storaged::ops;
use storaged::registry::BucketRegistry;
use tokio::sync::RwLock;
use crate::index_registry::IndexRegistry;
use crate::trial::HnswConfig;
const PROMOTION_PREFIX: &str = "_hnsw_promotions";
@ -55,14 +56,16 @@ pub struct PromotionFile {
#[derive(Clone)]
pub struct PromotionRegistry {
store: Arc<dyn ObjectStore>,
buckets: Arc<BucketRegistry>,
index_registry: IndexRegistry,
cache: Arc<RwLock<HashMap<String, PromotionFile>>>,
}
impl PromotionRegistry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
pub fn new(buckets: Arc<BucketRegistry>, index_registry: IndexRegistry) -> Self {
Self {
store,
buckets,
index_registry,
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
@ -76,13 +79,26 @@ impl PromotionRegistry {
format!("{PROMOTION_PREFIX}/{safe}.json")
}
/// Resolve which bucket's store holds this index's promotion file.
/// Same rules as TrialJournal::bucket_for — follows IndexMeta.bucket,
/// defaults to primary when metadata is missing.
async fn store_for(&self, index_name: &str) -> Result<Arc<dyn object_store::ObjectStore>, String> {
let bucket = self.index_registry
.get(index_name)
.await
.map(|m| m.bucket)
.unwrap_or_else(|| "primary".to_string());
self.buckets.get(&bucket)
}
/// Load (and cache) the promotion file for an index.
pub async fn load(&self, index_name: &str) -> Result<PromotionFile, String> {
if let Some(cached) = self.cache.read().await.get(index_name) {
return Ok(cached.clone());
}
let store = self.store_for(index_name).await?;
let key = Self::key(index_name);
let file = match ops::get(&self.store, &key).await {
let file = match ops::get(&store, &key).await {
Ok(bytes) => serde_json::from_slice::<PromotionFile>(&bytes)
.map_err(|e| format!("parse promotion file: {e}"))?,
Err(_) => PromotionFile {
@ -118,9 +134,10 @@ impl PromotionRegistry {
file.current = Some(entry);
file.index_name = index_name.to_string();
let store = self.store_for(index_name).await?;
let key = Self::key(index_name);
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
ops::put(&store, &key, json.into()).await?;
self.cache.write().await.insert(index_name.to_string(), file.clone());
tracing::info!(
@ -146,9 +163,10 @@ impl PromotionRegistry {
file.current = None;
}
}
let store = self.store_for(index_name).await?;
let key = Self::key(index_name);
let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
ops::put(&store, &key, json.into()).await?;
self.cache.write().await.insert(index_name.to_string(), file.clone());
tracing::info!("rolled back promotion for '{}'", index_name);
Ok(file)
@ -169,19 +187,34 @@ impl PromotionRegistry {
}
/// List every index that has a promotion recorded (for operator UI).
///
/// Federation: scans EVERY registered bucket for promotion files.
/// Per-profile buckets each have their own `_hnsw_promotions/` so we
/// aggregate across them. Dedups by index_name — if the same index
/// somehow has promotion files in multiple buckets, the one from the
/// bucket recorded in IndexMeta wins.
pub async fn list_all(&self) -> Result<Vec<PromotionFile>, String> {
let keys = ops::list(&self.store, Some(&format!("{PROMOTION_PREFIX}/"))).await.unwrap_or_default();
let mut out = Vec::new();
for key in keys {
if !key.ends_with(".json") { continue; }
let bytes = match ops::get(&self.store, &key).await {
Ok(b) => b,
let bucket_infos = self.buckets.list().await;
let mut by_name: HashMap<String, PromotionFile> = HashMap::new();
for b in &bucket_infos {
let store = match self.buckets.get(&b.name) {
Ok(s) => s,
Err(_) => continue,
};
if let Ok(f) = serde_json::from_slice::<PromotionFile>(&bytes) {
out.push(f);
let keys = ops::list(&store, Some(&format!("{PROMOTION_PREFIX}/")))
.await.unwrap_or_default();
for key in keys {
if !key.ends_with(".json") { continue; }
let bytes = match ops::get(&store, &key).await {
Ok(b) => b,
Err(_) => continue,
};
if let Ok(f) = serde_json::from_slice::<PromotionFile>(&bytes) {
by_name.insert(f.index_name.clone(), f);
}
}
}
Ok(out)
Ok(by_name.into_values().collect())
}
}

View File

@ -305,6 +305,8 @@ async fn try_update_index_meta(
created_at: chrono::Utc::now(),
build_time_secs: 0.0,
chunks_per_sec: 0.0,
bucket: "primary".to_string(),
vector_backend: shared::types::VectorBackend::Parquet,
};
index_registry.register(meta).await
}

View File

@ -11,7 +11,8 @@ use std::sync::Arc;
use aibridge::client::{AiClient, EmbedRequest};
use catalogd::registry::Registry as CatalogRegistry;
use crate::{autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, promotion, rag, refresh, search, store, supervisor, trial};
use storaged::registry::BucketRegistry;
use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, promotion, rag, refresh, search, store, supervisor, trial};
#[derive(Clone)]
pub struct VectorState {
@ -27,6 +28,33 @@ pub struct VectorState {
pub catalog: CatalogRegistry,
/// Phase 16: promoted HNSW configs. Activation + autotune read/write here.
pub promotion_registry: promotion::PromotionRegistry,
/// Phase 16.2: handle to the background autotune agent. Always
/// present — if the agent is disabled in config, the handle drops
/// incoming triggers silently.
pub agent_handle: agent::AgentHandle,
/// Phase B (federation layer 2): bucket registry for per-profile
/// bucket auto-provisioning on activation.
pub bucket_registry: Arc<BucketRegistry>,
/// Phase C (two-profile VRAM gate): tracks which profile is currently
/// "active" on the GPU. Singleton — one profile at a time holds its
/// model in VRAM. Swapping profiles with different ollama_name unloads
/// the previous one (keep_alive=0) before preloading the new one.
///
/// `None` = no profile has been activated this session; any first
/// activation just preloads and takes the slot.
pub active_profile: Arc<tokio::sync::RwLock<Option<ActiveProfileSlot>>>,
/// ADR-019 hybrid: handles to Lance datasets keyed by index name.
/// Lazy-created on first /vectors/lance/* call.
pub lance: lance_backend::LanceRegistry,
}
/// What the active-profile singleton records. Narrow — we don't need the
/// full ModelProfile here, just enough to know what to unload on swap.
#[derive(Debug, Clone, Serialize)]
pub struct ActiveProfileSlot {
pub profile_id: String,
pub ollama_name: String,
pub activated_at: chrono::DateTime<chrono::Utc>,
}
pub fn router(state: VectorState) -> Router {
@ -59,12 +87,26 @@ pub fn router(state: VectorState) -> Router {
// Phase 17: profile activation — pre-load caches + HNSW for this
// model's bound data. First search after activate is warm.
.route("/profile/{id}/activate", post(activate_profile))
.route("/profile/{id}/deactivate", post(deactivate_profile))
.route("/profile/{id}/search", post(profile_scoped_search))
// Phase 17 VRAM gate: which profile currently owns the GPU?
.route("/profile/active", get(get_active_profile))
// Phase 16: promotion + autotune
.route("/hnsw/promote/{index}/{trial_id}", post(promote_trial))
.route("/hnsw/rollback/{index}", post(rollback_promotion))
.route("/hnsw/promoted/{index}", get(get_promoted))
.route("/hnsw/autotune", post(run_autotune_endpoint))
// Phase 16.2: background autotune agent
.route("/agent/status", get(agent_status))
.route("/agent/stop", post(agent_stop))
.route("/agent/enqueue/{index_name}", post(agent_enqueue))
// ADR-019: Lance hybrid backend
.route("/lance/migrate/{index_name}", post(lance_migrate))
.route("/lance/index/{index_name}", post(lance_build_index))
.route("/lance/search/{index_name}", post(lance_search))
.route("/lance/doc/{index_name}/{doc_id}", get(lance_get_doc))
.route("/lance/append/{index_name}", post(lance_append))
.route("/lance/stats/{index_name}", get(lance_stats))
.with_state(state)
}
@ -81,6 +123,11 @@ struct CreateIndexRequest {
documents: Vec<DocInput>,
chunk_size: Option<usize>,
overlap: Option<usize>,
/// Federation layer 2: optional bucket to hold this index's trial
/// journal + promotion file. Defaults to "primary" — pre-existing
/// clients that don't know about federation keep working unchanged.
#[serde(default)]
bucket: Option<String>,
}
#[derive(Deserialize)]
@ -117,6 +164,7 @@ async fn create_index(
let n_docs = req.documents.len();
let n_chunks = chunks.len();
let index_name = req.index_name.clone();
let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string());
// Create job and return immediately
let job_id = state.job_tracker.create(&index_name, n_chunks).await;
@ -157,6 +205,8 @@ async fn create_index(
created_at: chrono::Utc::now(),
build_time_secs: elapsed,
chunks_per_sec: rate,
bucket: bucket.clone(),
vector_backend: shared::types::VectorBackend::Parquet,
};
let _ = registry.register(meta).await;
@ -753,6 +803,11 @@ struct ActivateReport {
failures: Vec<String>,
total_vectors: usize,
duration_secs: f32,
/// Phase C: did we successfully preload the Ollama model?
model_preloaded: bool,
/// Phase C: which profile previously held the GPU slot, if any.
/// Useful for observability of the swap.
previous_profile: Option<String>,
}
#[derive(Serialize)]
@ -788,6 +843,64 @@ async fn activate_profile(
let mut failures = Vec::new();
let mut total_vectors = 0usize;
// Phase 17 / C: VRAM-aware swap. If another profile currently holds
// the GPU and uses a DIFFERENT Ollama model than the one being
// activated, unload it first (keep_alive=0). Same-model activations
// skip the unload — no point churning a model that's already loaded.
let previous_slot = {
let guard = state.active_profile.read().await;
guard.clone()
};
if let Some(prev) = &previous_slot {
if prev.ollama_name != profile.ollama_name {
match state.ai_client.unload_model(&prev.ollama_name).await {
Ok(_) => tracing::info!(
"profile swap: unloaded '{}' ({} -> {})",
prev.ollama_name, prev.profile_id, profile.id,
),
Err(e) => failures.push(format!(
"unload previous model '{}': {e}", prev.ollama_name,
)),
}
}
}
// Federation layer 2: if this profile declares its own bucket and
// that bucket isn't registered yet, auto-provision it under the
// configured profile_root. This is the moment a "dormant" profile
// becomes live — its bucket exists and is readable/writable.
if let Some(bucket_name) = profile.bucket.clone() {
if !state.bucket_registry.contains(&bucket_name) {
let root = format!(
"{}/{}",
state.bucket_registry.profile_root().trim_end_matches('/'),
bucket_name.replace(':', "_"),
);
let bc = shared::config::BucketConfig {
name: bucket_name.clone(),
backend: "local".to_string(),
root: Some(root.clone()),
bucket: None,
region: None,
endpoint: None,
secret_ref: None,
};
match state.bucket_registry.add_bucket(bc).await {
Ok(info) => {
tracing::info!(
"profile '{}' activated bucket '{}' (root={}, reachable={})",
profile.id, bucket_name, root, info.reachable,
);
}
Err(e) => {
failures.push(format!(
"auto-provision bucket '{}': {}", bucket_name, e,
));
}
}
}
}
let all_indexes = state.index_registry.list(None, None).await;
for binding in &profile.bound_datasets {
@ -847,6 +960,30 @@ async fn activate_profile(
}
}
// Preload the new profile's Ollama model proactively. Same-model
// re-activations are cheap (Ollama no-ops if already loaded).
let mut model_preloaded = false;
match state.ai_client.preload_model(&profile.ollama_name).await {
Ok(_) => {
model_preloaded = true;
tracing::info!("profile '{}' preloaded ollama model '{}'",
profile.id, profile.ollama_name);
}
Err(e) => failures.push(format!(
"preload ollama model '{}': {e}", profile.ollama_name,
)),
}
// Take the GPU slot.
{
let mut guard = state.active_profile.write().await;
*guard = Some(ActiveProfileSlot {
profile_id: profile.id.clone(),
ollama_name: profile.ollama_name.clone(),
activated_at: chrono::Utc::now(),
});
}
Ok(Json(ActivateReport {
profile_id: profile.id,
ollama_name: profile.ollama_name,
@ -854,9 +991,52 @@ async fn activate_profile(
failures,
total_vectors,
duration_secs: t0.elapsed().as_secs_f32(),
model_preloaded,
previous_profile: previous_slot.map(|s| s.profile_id),
}))
}
/// Unload this profile's model and clear the active slot. No-op if the
/// caller isn't the currently-active profile.
async fn deactivate_profile(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
) -> impl IntoResponse {
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
let was_active = {
let mut guard = state.active_profile.write().await;
match guard.as_ref() {
Some(s) if s.profile_id == profile_id => {
let prev = s.clone();
*guard = None;
Some(prev)
}
_ => None,
}
};
// Regardless of whether it held the slot, we can still try to unload —
// the operator's intent is "get this model out of VRAM."
let unload_result = state.ai_client.unload_model(&profile.ollama_name).await;
Ok(Json(serde_json::json!({
"profile_id": profile.id,
"ollama_name": profile.ollama_name,
"was_active": was_active.is_some(),
"unloaded": unload_result.is_ok(),
"unload_error": unload_result.err(),
})))
}
async fn get_active_profile(State(state): State<VectorState>) -> impl IntoResponse {
let slot = state.active_profile.read().await.clone();
Json(slot)
}
#[derive(Deserialize)]
struct ProfileSearchRequest {
index_name: String,
@ -1013,3 +1193,232 @@ async fn run_autotune_endpoint(
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Phase 16.2: autotune agent endpoints ---
async fn agent_status(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.agent_handle.status().await)
}
async fn agent_stop(State(state): State<VectorState>) -> impl IntoResponse {
let stopped = state.agent_handle.stop().await;
Json(serde_json::json!({ "stopped": stopped }))
}
async fn agent_enqueue(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
let event = agent::TriggerEvent::manual(index_name);
match state.agent_handle.enqueue(event).await {
Ok(()) => Ok(Json(serde_json::json!({ "enqueued": true }))),
Err(e) => Err((StatusCode::SERVICE_UNAVAILABLE, e)),
}
}
// --- ADR-019: Lance hybrid backend HTTP surface ---
//
// Lance routes operate on the same `index_name` as the Parquet/HNSW path,
// but materialize the data as a Lance dataset on disk under
// `{bucket_root}/lance/{index_name}/`. The two backends are independent:
// you can have an index in both formats simultaneously. `IndexMeta.vector_backend`
// records which one is canonical for that index.
#[derive(Deserialize)]
struct LanceMigrateRequest {
/// Optional bucket override. Defaults to whatever the existing
/// IndexMeta says, or "primary" for indexes that don't exist yet.
#[serde(default)]
bucket: Option<String>,
}
/// Read the existing Parquet vector file for `index_name` from object
/// storage, hand the bytes to vectord-lance, return migration stats.
/// The original Parquet file is left intact — both backends coexist
/// after migration.
async fn lance_migrate(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceMigrateRequest>,
) -> impl IntoResponse {
let meta = state.index_registry.get(&index_name).await
.ok_or((StatusCode::NOT_FOUND, format!("index not found: {index_name}")))?;
let bucket = req.bucket.unwrap_or(meta.bucket.clone());
// Pull the Parquet bytes via storaged::ops — same path as the
// existing embedding loader uses.
let store = state.bucket_registry.get(&bucket)
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let bytes = storaged::ops::get(&store, &meta.storage_key).await
.map_err(|e| (StatusCode::NOT_FOUND, format!("read parquet: {e}")))?;
let lance_store = state.lance.store_for_new(&index_name, &bucket).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let stats = lance_store.migrate_from_parquet_bytes(&bytes).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
tracing::info!(
"lance migrate '{}': {} rows, {}d, {} bytes on disk, {:.2}s",
index_name, stats.rows_written, stats.dimensions,
stats.disk_bytes, stats.duration_secs,
);
Ok::<_, (StatusCode, String)>(Json(serde_json::json!({
"index_name": index_name,
"bucket": bucket,
"lance_path": lance_store.path(),
"stats": stats,
})))
}
#[derive(Deserialize)]
struct LanceIndexRequest {
#[serde(default = "default_partitions")]
num_partitions: u32,
#[serde(default = "default_bits")]
num_bits: u32,
#[serde(default = "default_subvectors")]
num_sub_vectors: u32,
}
fn default_partitions() -> u32 { 316 } // ≈√100K — sane for the reference dataset
fn default_bits() -> u32 { 8 }
fn default_subvectors() -> u32 { 48 } // 768/48 = 16 dims per subvector
/// Build the IVF_PQ index on the Lance dataset.
async fn lance_build_index(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceIndexRequest>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
match lance_store.build_index(req.num_partitions, req.num_bits, req.num_sub_vectors).await {
Ok(stats) => Ok(Json(stats)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct LanceSearchRequest {
/// Plain text query — embedded server-side for symmetry with the
/// existing /vectors/search path.
query: String,
#[serde(default = "default_top_k")]
top_k: usize,
}
fn default_top_k() -> usize { 5 }
/// Vector search against a Lance dataset. Embeds the query text via the
/// sidecar then calls Lance's nearest-neighbor scanner.
async fn lance_search(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceSearchRequest>,
) -> impl IntoResponse {
let embed_resp = state.ai_client
.embed(EmbedRequest { texts: vec![req.query.clone()], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into()));
}
let qv: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let t0 = std::time::Instant::now();
let hits = lance_store.search(&qv, req.top_k).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
Ok(Json(serde_json::json!({
"index_name": index_name,
"query": req.query,
"method": "lance_ivf_pq",
"latency_us": t0.elapsed().as_micros() as u64,
"results": hits,
})))
}
/// Random-access fetch by doc_id — the O(1) lookup that's basically
/// impossible in our Parquet path without scanning the whole file.
async fn lance_get_doc(
State(state): State<VectorState>,
Path((index_name, doc_id)): Path<(String, String)>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let t0 = std::time::Instant::now();
match lance_store.get_by_doc_id(&doc_id).await {
Ok(Some(row)) => Ok(Json(serde_json::json!({
"index_name": index_name,
"doc_id": doc_id,
"latency_us": t0.elapsed().as_micros() as u64,
"row": row,
}))),
Ok(None) => Err((StatusCode::NOT_FOUND, format!("doc_id not found: {doc_id}"))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct LanceAppendRequest {
/// Optional source tag — set on every appended row.
#[serde(default)]
source: Option<String>,
rows: Vec<LanceAppendRow>,
}
#[derive(Deserialize)]
struct LanceAppendRow {
doc_id: String,
#[serde(default)]
chunk_idx: Option<i32>,
chunk_text: String,
/// Pre-computed embedding. Caller is responsible for ensuring it
/// matches the dataset's dimensions and embedding model.
vector: Vec<f32>,
}
async fn lance_append(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceAppendRequest>,
) -> impl IntoResponse {
if req.rows.is_empty() {
return Err((StatusCode::BAD_REQUEST, "rows array is empty".into()));
}
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let mut doc_ids = Vec::with_capacity(req.rows.len());
let mut chunk_idxs = Vec::with_capacity(req.rows.len());
let mut chunk_texts = Vec::with_capacity(req.rows.len());
let mut vectors = Vec::with_capacity(req.rows.len());
for r in req.rows {
doc_ids.push(r.doc_id);
chunk_idxs.push(r.chunk_idx.unwrap_or(0));
chunk_texts.push(r.chunk_text);
vectors.push(r.vector);
}
match lance_store.append(req.source, doc_ids, chunk_idxs, chunk_texts, vectors).await {
Ok(stats) => Ok(Json(stats)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn lance_stats(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
match lance_store.stats().await {
Ok(s) => Ok(Json(s)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -17,6 +17,9 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::append_log::AppendLog;
use storaged::registry::BucketRegistry;
use crate::index_registry::IndexRegistry;
/// HNSW build/search parameters the agent can tune.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -86,18 +89,28 @@ impl Trial {
}
/// Per-index append log, lazy-created on first write.
///
/// Federation layer 2: the journal resolves each index's bucket from the
/// index registry and writes its JSONL batches to THAT bucket, not
/// primary. Back-compat is preserved by `IndexMeta::bucket` defaulting
/// to "primary" for pre-federation indexes. Indexes the registry has
/// never heard of (edge case — trials run before first register) fall
/// through to primary as well.
#[derive(Clone)]
pub struct TrialJournal {
store: Arc<dyn ObjectStore>,
/// Cache per-index AppendLog instances so the in-memory buffer persists
/// across calls.
logs: Arc<RwLock<HashMap<String, Arc<AppendLog>>>>,
buckets: Arc<BucketRegistry>,
index_registry: IndexRegistry,
/// Cache per (bucket, index) AppendLog so the in-memory buffer persists
/// across calls. Keyed by `(bucket, index_name)` so moving an index
/// between buckets is clean — the old journal stays intact.
logs: Arc<RwLock<HashMap<(String, String), Arc<AppendLog>>>>,
}
impl TrialJournal {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
pub fn new(buckets: Arc<BucketRegistry>, index_registry: IndexRegistry) -> Self {
Self {
store,
buckets,
index_registry,
logs: Arc::new(RwLock::new(HashMap::new())),
}
}
@ -106,35 +119,49 @@ impl TrialJournal {
format!("_hnsw_trials/{}", index_name)
}
async fn log_for(&self, index_name: &str) -> Arc<AppendLog> {
if let Some(log) = self.logs.read().await.get(index_name) {
return log.clone();
/// Resolve which bucket holds this index's trial artifacts.
/// Falls back to primary for indexes without recorded metadata.
async fn bucket_for(&self, index_name: &str) -> String {
self.index_registry
.get(index_name)
.await
.map(|m| m.bucket)
.unwrap_or_else(|| "primary".to_string())
}
async fn log_for(&self, index_name: &str) -> Result<Arc<AppendLog>, String> {
let bucket = self.bucket_for(index_name).await;
let key = (bucket.clone(), index_name.to_string());
if let Some(log) = self.logs.read().await.get(&key) {
return Ok(log.clone());
}
let mut guard = self.logs.write().await;
if let Some(log) = guard.get(index_name) {
return log.clone();
if let Some(log) = guard.get(&key) {
return Ok(log.clone());
}
let store = self.buckets.get(&bucket)?;
// Trials arrive one at a time during human/agent iteration — a low
// threshold gives "hit /trials and see my latest attempt" immediacy
// without creating one file per event.
let log = Arc::new(
AppendLog::new(self.store.clone(), Self::prefix(index_name))
AppendLog::new(store, Self::prefix(index_name))
.with_flush_threshold(4),
);
guard.insert(index_name.to_string(), log.clone());
log
guard.insert(key, log.clone());
Ok(log)
}
/// Append a trial record. In-memory buffered; persisted in batches.
pub async fn append(&self, trial: &Trial) -> Result<(), String> {
let line = serde_json::to_vec(trial).map_err(|e| e.to_string())?;
let log = self.log_for(&trial.index_name).await;
let log = self.log_for(&trial.index_name).await?;
log.append(line).await
}
/// Read all trials for an index (flushed batches + unflushed buffer).
pub async fn list(&self, index_name: &str) -> Result<Vec<Trial>, String> {
let log = self.log_for(index_name).await;
let log = self.log_for(index_name).await?;
let lines = log.read_all().await?;
let mut trials = Vec::with_capacity(lines.len());
for line in lines {
@ -149,13 +176,13 @@ impl TrialJournal {
/// Explicit flush for callers that want write-through semantics
/// (e.g. an agent that wants to commit a trial before querying stats).
pub async fn flush(&self, index_name: &str) -> Result<(), String> {
let log = self.log_for(index_name).await;
let log = self.log_for(index_name).await?;
log.flush().await
}
/// Compact all batch files for an index into one.
pub async fn compact(&self, index_name: &str) -> Result<storaged::append_log::CompactStats, String> {
let log = self.log_for(index_name).await;
let log = self.log_for(index_name).await?;
log.compact().await
}

View File

@ -47,3 +47,14 @@ enabled = false
# Export traces to stdout (set to "otlp" for OpenTelemetry collector)
exporter = "stdout"
service_name = "lakehouse"
[agent]
# Phase 16.2 — background autotune agent. Opt-in: set enabled = true to
# let the agent continuously propose + trial HNSW configs and auto-promote
# winners. Defaults are conservative so it stays out of the way of live
# search traffic on shared Ollama.
enabled = true
cycle_interval_secs = 120 # periodic wake if no triggers
cooldown_between_trials_secs = 10 # min gap between trials
min_recall = 0.9 # never promote below this
max_trials_per_hour = 20 # hard budget cap

137
sidecar/sidecar/admin.py Normal file
View File

@ -0,0 +1,137 @@
"""Admin endpoints: model lifecycle + VRAM introspection.
Phase 17 / Phase C the VRAM-aware profile swap story. Ollama loads
models lazily and unloads after a TTL (default 5min). For predictable
swaps between model profiles, we need explicit control:
- unload: send keep_alive=0 to force immediate unload
- preload: send keep_alive=5m with empty prompt to proactively load
- ps: query Ollama's /api/ps to see what's currently loaded
All three are thin wrappers over Ollama's own API. No state held here.
"""
import asyncio
import shutil
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from .ollama import client
router = APIRouter()
class ModelRequest(BaseModel):
model: str
@router.post("/unload")
async def unload(req: ModelRequest):
"""Force Ollama to unload a model immediately (keep_alive=0 trick)."""
async with client() as c:
resp = await c.post(
"/api/generate",
json={"model": req.model, "prompt": "", "keep_alive": 0, "stream": False},
)
# Ollama returns 200 even on unload; anything else is abnormal.
if resp.status_code not in (200,):
raise HTTPException(502, f"Ollama unload error: {resp.text}")
return {"unloaded": req.model}
@router.post("/preload")
async def preload(req: ModelRequest):
"""Force Ollama to load a model into VRAM and keep it there."""
async with client() as c:
resp = await c.post(
"/api/generate",
json={
"model": req.model,
# "" can confuse some models; a single token is safer and still ~instant.
"prompt": " ",
"keep_alive": "5m",
"stream": False,
"options": {"num_predict": 1},
},
)
if resp.status_code != 200:
raise HTTPException(502, f"Ollama preload error: {resp.text}")
data = resp.json()
return {
"preloaded": req.model,
"load_duration_ns": data.get("load_duration"),
"total_duration_ns": data.get("total_duration"),
}
@router.get("/ps")
async def list_loaded():
"""What models does Ollama currently have in VRAM?"""
async with client() as c:
resp = await c.get("/api/ps")
if resp.status_code != 200:
raise HTTPException(502, f"Ollama ps error: {resp.text}")
data = resp.json()
# Flatten Ollama's fields into something small + useful.
models = [
{
"name": m.get("name"),
"size_bytes": m.get("size"),
"size_vram_bytes": m.get("size_vram"),
"expires_at": m.get("expires_at"),
}
for m in data.get("models", [])
]
return {"models": models}
@router.get("/vram")
async def vram_summary():
"""Combined: nvidia-smi VRAM + Ollama loaded models.
Shells out to nvidia-smi; if it's not on PATH, returns just the
Ollama view. Intentionally async-via-to_thread so the blocking
subprocess doesn't stall the event loop.
"""
gpu = None
if shutil.which("nvidia-smi"):
gpu = await asyncio.to_thread(_nvidia_smi_snapshot)
async with client() as c:
resp = await c.get("/api/ps")
loaded = resp.json().get("models", []) if resp.status_code == 200 else []
return {
"gpu": gpu,
"ollama_loaded": [
{
"name": m.get("name"),
"size_vram_mib": (m.get("size_vram", 0) or 0) // (1024 * 1024),
"expires_at": m.get("expires_at"),
}
for m in loaded
],
}
def _nvidia_smi_snapshot():
"""One-shot nvidia-smi poll. Returns None on failure."""
import subprocess
try:
out = subprocess.check_output(
[
"nvidia-smi",
"--query-gpu=memory.used,memory.total,utilization.gpu,name",
"--format=csv,noheader,nounits",
],
timeout=2,
).decode().strip()
used_mib, total_mib, util_pct, name = [s.strip() for s in out.split(",")]
return {
"name": name,
"used_mib": int(used_mib),
"total_mib": int(total_mib),
"utilization_pct": int(util_pct),
}
except Exception:
return None

View File

@ -2,6 +2,7 @@ import os
from fastapi import FastAPI
from .admin import router as admin_router
from .embed import router as embed_router
from .generate import router as generate_router
from .rerank import router as rerank_router
@ -11,6 +12,7 @@ app = FastAPI(title="Lakehouse AI Sidecar")
app.include_router(embed_router, prefix="/embed", tags=["embed"])
app.include_router(generate_router, prefix="/generate", tags=["generate"])
app.include_router(rerank_router, prefix="/rerank", tags=["rerank"])
app.include_router(admin_router, prefix="/admin", tags=["admin"])
@app.get("/health")