diff --git a/Cargo.lock b/Cargo.lock index 4326bbf..e238092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,7 +262,7 @@ dependencies = [ "arrow-schema 55.2.0", "arrow-select 55.2.0", "atoi", - "base64", + "base64 0.22.1", "chrono", "comfy-table", "half", @@ -284,7 +284,7 @@ dependencies = [ "arrow-schema 57.3.0", "arrow-select 57.3.0", "atoi", - "base64", + "base64 0.22.1", "chrono", "comfy-table", "half", @@ -738,6 +738,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -757,6 +763,24 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash 2.1.1", + "shlex", + "syn 2.0.117", +] + [[package]] name = "bitflags" version = "2.11.0" @@ -871,6 +895,15 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "btoi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd6407f73a9b8b6162d8a2ef999fe6afd7cc15902ebf42c5cd296addf17e0ad" +dependencies = [ + "num-traits", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -976,6 +1009,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "cfb" version = "0.7.3" @@ -1005,7 +1047,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f927b07c74ba84c7e5fe4db2baeb3e996ab2688992e39ac68ce3220a677c7e" dependencies = [ - "base64", + "base64 0.22.1", "encoding_rs", ] @@ -1070,6 +1112,26 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "combine" version = "4.6.7" @@ -1283,6 +1345,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -1638,7 +1713,7 @@ dependencies = [ "ahash", "arrow 55.2.0", "arrow-ipc 55.2.0", - "base64", + "base64 0.22.1", "half", "hashbrown 0.14.5", "indexmap", @@ -2041,7 +2116,7 @@ checksum = "2ddf0a0a2db5d2918349c978d42d80926c6aa2459cd8a3c533a84ec4bb63479e" dependencies = [ "arrow 55.2.0", "arrow-buffer 55.2.0", - "base64", + "base64 0.22.1", "blake2", "blake3", "chrono", @@ -2070,7 +2145,7 @@ checksum = "07356c94118d881130dd0ffbff127540407d969c8978736e324edcd6c41cd48f" dependencies = [ "arrow 57.3.0", "arrow-buffer 57.3.0", - "base64", + "base64 0.22.1", "blake2", "blake3", "chrono", @@ -2867,7 +2942,7 @@ dependencies = [ "async-tungstenite", "axum", "axum-core", - "base64", + "base64 0.22.1", "bytes", "ciborium", "const-str", @@ -2921,7 +2996,7 @@ checksum = "cda8b152e85121243741b9d5f2a3d8cb3c47a7b2299e902f98b6a7719915b0a2" dependencies = [ "anyhow", "axum-core", - "base64", + "base64 0.22.1", "ciborium", "dioxus-core", "dioxus-document", @@ -3325,6 +3400,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", + "libz-sys", "miniz_oxide", "zlib-rs", ] @@ -3705,7 +3781,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "headers-core", "http", @@ -3867,7 +3943,7 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-util", @@ -4063,6 +4139,7 @@ dependencies = [ "chrono", "csv", "lopdf", + "mysql_async", "object_store", "parquet 55.2.0", "serde", @@ -4074,6 +4151,7 @@ dependencies = [ "tokio-postgres", "tracing", "uuid", + "vectord", ] [[package]] @@ -4306,6 +4384,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "keyed_priority_queue" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" +dependencies = [ + "indexmap", +] + [[package]] name = "lance" version = "4.0.0" @@ -4650,7 +4737,7 @@ dependencies = [ "tempfile", "tokio", "tracing", - "twox-hash", + "twox-hash 2.1.2", "uuid", ] @@ -4890,6 +4977,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libz-sys" +version = "1.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5012,7 +5110,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" dependencies = [ - "twox-hash", + "twox-hash 2.1.2", ] [[package]] @@ -5021,7 +5119,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746" dependencies = [ - "twox-hash", + "twox-hash 2.1.2", ] [[package]] @@ -5255,6 +5353,68 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b" +[[package]] +name = "mysql_async" +version = "0.34.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0b66e411c31265e879d9814d03721f2daa7ad07337b6308cb4bb0cde7e6fd47" +dependencies = [ + "bytes", + "crossbeam", + "flate2", + "futures-core", + "futures-sink", + "futures-util", + "keyed_priority_queue", + "lru", + "mysql_common", + "pem", + "percent-encoding", + "pin-project", + "rand 0.8.5", + "serde", + "serde_json", + "socket2 0.5.10", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "twox-hash 1.6.3", + "url", +] + +[[package]] +name = "mysql_common" +version = "0.32.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478b0ff3f7d67b79da2b96f56f334431aef65e15ba4b29dd74a4236e29582bdc" +dependencies = [ + "base64 0.21.7", + "bindgen", + "bitflags", + "btoi", + "byteorder", + "bytes", + "cc", + "cmake", + "crc32fast", + "flate2", + "lazy_static", + "num-bigint", + "num-traits", + "rand 0.8.5", + "regex", + "saturating", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "subprocess", + "thiserror 1.0.69", + "uuid", + "zstd", +] + [[package]] name = "ndarray" version = "0.16.1" @@ -5485,7 +5645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" dependencies = [ "async-trait", - "base64", + "base64 0.22.1", "bytes", "chrono", "form_urlencoded", @@ -5667,7 +5827,7 @@ dependencies = [ "arrow-ipc 55.2.0", "arrow-schema 55.2.0", "arrow-select 55.2.0", - "base64", + "base64 0.22.1", "brotli", "bytes", "chrono", @@ -5685,7 +5845,7 @@ dependencies = [ "snap", "thrift", "tokio", - "twox-hash", + "twox-hash 2.1.2", "zstd", ] @@ -5703,7 +5863,7 @@ dependencies = [ "arrow-ipc 57.3.0", "arrow-schema 57.3.0", "arrow-select 57.3.0", - "base64", + "base64 0.22.1", "brotli", "bytes", "chrono", @@ -5719,7 +5879,7 @@ dependencies = [ "simdutf8", "snap", "thrift", - "twox-hash", + "twox-hash 2.1.2", "zstd", ] @@ -5741,6 +5901,16 @@ dependencies = [ "stfu8", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -5871,7 +6041,7 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ee9dd5fe15055d2b6806f4736aa0c9637217074e224bbec46d4041b91bb9491" dependencies = [ - "base64", + "base64 0.22.1", "byteorder", "bytes", "fallible-iterator", @@ -6112,6 +6282,7 @@ dependencies = [ "datafusion 47.0.0", "futures", "object_store", + "parquet 55.2.0", "serde", "serde_json", "shared", @@ -6427,7 +6598,7 @@ version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "cookie", "cookie_store", @@ -6618,6 +6789,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "saturating" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71" + [[package]] name = "schannel" version = "0.1.29" @@ -7039,6 +7216,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "std_prelude" version = "0.2.12" @@ -7106,6 +7289,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "subprocess" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c56e8662b206b9892d7a5a3f2ecdbcb455d3d6b259111373b7e08b8055158a8" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "subsecond" version = "0.7.3" @@ -7196,7 +7389,7 @@ checksum = "64a966cb0e76e311f09cf18507c9af192f15d34886ee43d7ba7c7e3803660c43" dependencies = [ "aho-corasick", "arc-swap", - "base64", + "base64 0.22.1", "bitpacking", "bon", "byteorder", @@ -7655,7 +7848,7 @@ checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" dependencies = [ "async-trait", "axum", - "base64", + "base64 0.22.1", "bytes", "h2", "http", @@ -7872,6 +8065,17 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand 0.8.5", + "static_assertions", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -8004,6 +8208,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vectord" version = "0.1.0" @@ -8012,6 +8222,7 @@ dependencies = [ "arrow 55.2.0", "axum", "bytes", + "catalogd", "chrono", "instant-distance", "object_store", @@ -8023,6 +8234,24 @@ dependencies = [ "tokio", "tracing", "uuid", + "vectord-lance", +] + +[[package]] +name = "vectord-lance" +version = "0.1.0" +dependencies = [ + "arrow 57.3.0", + "arrow-array 57.3.0", + "arrow-schema 57.3.0", + "bytes", + "futures", + "lance", + "lance-index", + "lance-linalg", + "parquet 57.3.0", + "serde", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d886001..a0315b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/gateway", "crates/ui", "crates/lance-bench", + "crates/vectord-lance", ] [workspace.dependencies] @@ -47,3 +48,4 @@ lopdf = "0.35" encoding_rs = "0.8" instant-distance = "0.6" tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-chrono-0_4", "with-uuid-1"] } +mysql_async = { version = "0.34", default-features = false, features = ["minimal"] } diff --git a/crates/aibridge/src/client.rs b/crates/aibridge/src/client.rs index 97e7cec..1340528 100644 --- a/crates/aibridge/src/client.rs +++ b/crates/aibridge/src/client.rs @@ -134,4 +134,47 @@ impl AiClient { } resp.json().await.map_err(|e| format!("rerank parse error: {e}")) } + + /// Force Ollama to unload the named model from VRAM (keep_alive=0). + /// Used for predictable profile swaps — without this, Ollama holds a + /// model for its configured TTL (default 5min) and the previous + /// profile's model can linger in VRAM next to the new one. + pub async fn unload_model(&self, model: &str) -> Result { + let resp = self.client + .post(format!("{}/admin/unload", self.base_url)) + .json(&serde_json::json!({ "model": model })) + .send().await + .map_err(|e| format!("unload request failed: {e}"))?; + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("unload error: {text}")); + } + resp.json().await.map_err(|e| format!("unload parse error: {e}")) + } + + /// Ask Ollama to load the named model into VRAM proactively. Makes + /// the first real request after profile activation fast (no cold-load + /// latency). + pub async fn preload_model(&self, model: &str) -> Result { + let resp = self.client + .post(format!("{}/admin/preload", self.base_url)) + .json(&serde_json::json!({ "model": model })) + .send().await + .map_err(|e| format!("preload request failed: {e}"))?; + if !resp.status().is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(format!("preload error: {text}")); + } + resp.json().await.map_err(|e| format!("preload parse error: {e}")) + } + + /// GPU + loaded-model snapshot from the sidecar. Combines nvidia-smi + /// output (if available) with Ollama's /api/ps. + pub async fn vram_snapshot(&self) -> Result { + let resp = self.client + .get(format!("{}/admin/vram", self.base_url)) + .send().await + .map_err(|e| format!("vram request failed: {e}"))?; + resp.json().await.map_err(|e| format!("vram parse error: {e}")) + } } diff --git a/crates/aibridge/src/service.rs b/crates/aibridge/src/service.rs index dee4847..de524a2 100644 --- a/crates/aibridge/src/service.rs +++ b/crates/aibridge/src/service.rs @@ -16,9 +16,17 @@ pub fn router(client: AiClient) -> Router { .route("/embed", post(embed)) .route("/generate", post(generate)) .route("/rerank", post(rerank)) + .route("/vram", get(vram)) .with_state(client) } +async fn vram(State(client): State) -> impl IntoResponse { + match client.vram_snapshot().await { + Ok(snap) => Ok(Json(snap)), + Err(e) => Err((StatusCode::BAD_GATEWAY, e)), + } +} + async fn health(State(client): State) -> impl IntoResponse { match client.health().await { Ok(info) => Ok(Json(info)), diff --git a/crates/catalogd/src/service.rs b/crates/catalogd/src/service.rs index 52854e1..3c85831 100644 --- a/crates/catalogd/src/service.rs +++ b/crates/catalogd/src/service.rs @@ -357,6 +357,14 @@ struct CreateProfileRequest { embed_model: String, #[serde(default)] created_by: String, + /// Federation: optional per-profile bucket (`profile:{id}` by convention). + /// Omitting keeps artifacts in primary. + #[serde(default)] + bucket: Option, + /// ADR-019 hybrid: which vector backend to route this profile's + /// indexes to. Defaults to Parquet+HNSW. + #[serde(default)] + vector_backend: shared::types::VectorBackend, } fn default_embed_model_req() -> String { "nomic-embed-text".to_string() } @@ -374,6 +382,8 @@ async fn create_profile( embed_model: req.embed_model, created_at: chrono::Utc::now(), created_by: req.created_by, + bucket: req.bucket, + vector_backend: req.vector_backend, }; match registry.put_profile(profile).await { Ok(p) => Ok((StatusCode::CREATED, Json(p))), diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 82508c8..14e97f1 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -75,6 +75,41 @@ async fn main() { // AI sidecar client let ai_client = aibridge::client::AiClient::new(&config.sidecar.url); + // Vector service components — built before the router because both the + // /vectors service AND ingestd need the agent handle to enqueue triggers. + let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); + let _ = index_reg.rebuild().await; + let hnsw = vectord::hnsw::HnswStore::new(); + let emb_cache = vectord::embedding_cache::EmbeddingCache::new(store.clone()); + // Phase B (federation layer 2): trial journals + promotion files now + // live in each index's recorded bucket (IndexMeta.bucket), not always + // primary. Journal / registry resolve per-call via the bucket registry. + let tj = vectord::trial::TrialJournal::new(bucket_registry.clone(), index_reg.clone()); + let pr = vectord::promotion::PromotionRegistry::new(bucket_registry.clone(), index_reg.clone()); + + // Phase 16.2: spawn the autotune agent. When config.agent.enabled=false + // this returns a handle that drops triggers silently — no surprise load. + let agent_cfg = vectord::agent::AgentConfig { + enabled: config.agent.enabled, + cycle_interval_secs: config.agent.cycle_interval_secs, + cooldown_between_trials_secs: config.agent.cooldown_between_trials_secs, + min_recall: config.agent.min_recall, + max_trials_per_hour: config.agent.max_trials_per_hour, + }; + let agent_handle = vectord::agent::spawn( + agent_cfg, + vectord::agent::AgentDeps { + store: store.clone(), + ai_client: ai_client.clone(), + catalog: registry.clone(), + index_registry: index_reg.clone(), + hnsw_store: hnsw.clone(), + embedding_cache: emb_cache.clone(), + trial_journal: tj.clone(), + promotion_registry: pr.clone(), + }, + ); + // HTTP router let mut app = Router::new() .route("/health", get(health)) @@ -87,21 +122,25 @@ async fn main() { store: store.clone(), registry: registry.clone(), buckets: bucket_registry.clone(), + agent_handle: agent_handle.clone(), + index_registry: index_reg.clone(), })) - .nest("/vectors", vectord::service::router({ - let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); - let _ = index_reg.rebuild().await; - vectord::service::VectorState { - store: store.clone(), - ai_client: ai_client.clone(), - job_tracker: vectord::jobs::JobTracker::new(), - index_registry: index_reg, - hnsw_store: vectord::hnsw::HnswStore::new(), - embedding_cache: vectord::embedding_cache::EmbeddingCache::new(store.clone()), - trial_journal: vectord::trial::TrialJournal::new(store.clone()), - catalog: registry.clone(), - promotion_registry: vectord::promotion::PromotionRegistry::new(store.clone()), - } + .nest("/vectors", vectord::service::router(vectord::service::VectorState { + store: store.clone(), + ai_client: ai_client.clone(), + job_tracker: vectord::jobs::JobTracker::new(), + index_registry: index_reg.clone(), + hnsw_store: hnsw, + embedding_cache: emb_cache, + trial_journal: tj, + catalog: registry.clone(), + promotion_registry: pr, + agent_handle, + bucket_registry: bucket_registry.clone(), + active_profile: std::sync::Arc::new(tokio::sync::RwLock::new(None)), + lance: vectord::lance_backend::LanceRegistry::new( + bucket_registry.clone(), index_reg.clone(), + ), })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) .nest("/journal", journald::service::router(journal)) diff --git a/crates/ingestd/Cargo.toml b/crates/ingestd/Cargo.toml index 9bc1ed3..a28546b 100644 --- a/crates/ingestd/Cargo.toml +++ b/crates/ingestd/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" shared = { path = "../shared" } storaged = { path = "../storaged" } catalogd = { path = "../catalogd" } +vectord = { path = "../vectord" } tokio = { workspace = true } axum = { workspace = true, features = ["multipart"] } lopdf = { workspace = true } @@ -21,4 +22,5 @@ csv = { workspace = true } chrono = { workspace = true } object_store = { workspace = true } tokio-postgres = { workspace = true } +mysql_async = { workspace = true } uuid = { workspace = true } diff --git a/crates/ingestd/src/lib.rs b/crates/ingestd/src/lib.rs index b133fdf..fc0c0f3 100644 --- a/crates/ingestd/src/lib.rs +++ b/crates/ingestd/src/lib.rs @@ -1,4 +1,5 @@ pub mod db_ingest; +pub mod my_stream; pub mod pg_stream; pub mod detect; pub mod csv_ingest; diff --git a/crates/ingestd/src/my_stream.rs b/crates/ingestd/src/my_stream.rs new file mode 100644 index 0000000..8fce599 --- /dev/null +++ b/crates/ingestd/src/my_stream.rs @@ -0,0 +1,410 @@ +/// Streaming MySQL ingest. +/// +/// Mirrors `pg_stream` for MySQL sources. Same OFFSET-paginated strategy, +/// same Parquet-streaming shape. Uses `mysql_async` (pure-rust) so we +/// don't need a C client library at build time. +/// +/// Type mapping follows ADR-010 (default to string on ambiguity): +/// - Booleans (TINYINT(1)) and integer types map to Arrow Int32/Int64 +/// - Floating point and decimals → Float64 +/// - Everything else (text, varchar, json, date, timestamp) → Utf8 +/// +/// What's deliberately not supported (yet): +/// - TLS connections — `minimal` feature is plain TCP only. Upgrade +/// when a tenant actually needs it. +/// - Keyset pagination — OFFSET scans are O(N²) at multi-million-row +/// scale; same caveat as `pg_stream`. +/// - BINARY/BLOB columns — currently rendered as base64 or empty string +/// via Display-fallback. + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use mysql_async::prelude::*; +use mysql_async::{Conn, Opts, Row, Value}; +use parquet::arrow::ArrowWriter; +use std::sync::Arc; + +/// Request shape for MySQL streaming ingest. +#[derive(Debug, Clone, serde::Deserialize)] +pub struct MyStreamRequest { + /// mysql://user:pass@host:port/db + pub dsn: String, + pub table: String, + #[serde(default)] + pub dataset_name: Option, + /// Rows per fetch. Default 10_000. + #[serde(default)] + pub batch_size: Option, + /// Column to ORDER BY for stable pagination. If omitted, the first + /// column returned by the schema probe is used. + #[serde(default)] + pub order_by: Option, + /// Hard cap on total rows (for sampling / previews). + #[serde(default)] + pub limit: Option, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct MyStreamResult { + pub table: String, + pub rows: usize, + pub batches: usize, + pub columns: usize, + pub schema: Vec, + pub parquet_bytes: u64, + pub duration_secs: f32, +} + +/// Parsed DSN pieces. Kept local (rather than reusing pg's DbConfig) so +/// the MySQL connector doesn't depend on the PG path. +#[derive(Debug, Clone)] +pub struct MyConfig { + pub host: String, + pub port: u16, + pub user: String, + pub password: String, + pub database: String, +} + +impl MyConfig { + /// Build a mysql_async `Opts` from the parsed config. + pub fn to_opts(&self) -> Opts { + let url = if self.password.is_empty() { + format!( + "mysql://{}@{}:{}/{}", + percent(&self.user), percent(&self.host), self.port, percent(&self.database), + ) + } else { + format!( + "mysql://{}:{}@{}:{}/{}", + percent(&self.user), percent(&self.password), + percent(&self.host), self.port, percent(&self.database), + ) + }; + Opts::from_url(&url).expect("MyConfig.to_opts: rebuilt URL should parse") + } +} + +/// Minimal URL-encoder for the few characters that commonly appear in +/// MySQL passwords. mysql_async's URL parser expects valid URL-encoded +/// components. +fn percent(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for c in s.chars() { + match c { + '@' | ':' | '/' | '?' | '#' | '[' | ']' | ' ' | '%' => { + out.push_str(&format!("%{:02X}", c as u32)); + } + _ => out.push(c), + } + } + out +} + +/// Parse a mysql:// DSN. +/// Supports: mysql://[user[:password]@]host[:port][/db] +pub fn parse_dsn(dsn: &str) -> Result { + let rest = dsn + .strip_prefix("mysql://") + .ok_or_else(|| "DSN must start with mysql://".to_string())?; + + let (auth_host, database) = match rest.split_once('/') { + Some((ah, db)) => (ah, db.split('?').next().unwrap_or(db).to_string()), + None => (rest, String::new()), + }; + + let (userpass, hostport) = match auth_host.rsplit_once('@') { + Some((up, hp)) => (Some(up), hp), + None => (None, auth_host), + }; + + let (user, password) = match userpass { + Some(up) => match up.split_once(':') { + Some((u, p)) => (u.to_string(), p.to_string()), + None => (up.to_string(), String::new()), + }, + None => ("root".to_string(), String::new()), + }; + + let (host, port) = match hostport.rsplit_once(':') { + Some((h, p)) => ( + h.to_string(), + p.parse::().map_err(|_| format!("invalid port in DSN: {p}"))?, + ), + None => (hostport.to_string(), 3306), + }; + + if host.is_empty() { + return Err("DSN has no host".into()); + } + if database.is_empty() { + return Err("DSN has no database path (mysql://... /db)".into()); + } + + Ok(MyConfig { host, port, user, password, database }) +} + +/// Stream a MySQL table as Parquet bytes. +pub async fn stream_table_to_parquet( + req: &MyStreamRequest, +) -> Result<(bytes::Bytes, MyStreamResult), String> { + let t0 = std::time::Instant::now(); + let config = parse_dsn(&req.dsn)?; + let batch_size = req.batch_size.unwrap_or(10_000).max(1); + + let mut conn = Conn::new(config.to_opts()).await + .map_err(|e| format!("mysql connect: {e}"))?; + + // Probe columns via information_schema — gives real type names that + // we can map to Arrow dtypes without needing to fetch a row first. + let columns = probe_columns(&mut conn, &config.database, &req.table).await?; + if columns.is_empty() { + return Err(format!("table '{}' not found or has no columns", req.table)); + } + + let arrow_fields: Vec = columns + .iter() + .map(|(name, ty)| Field::new(name, mysql_type_to_arrow(ty), true)) + .collect(); + let schema = Arc::new(Schema::new(arrow_fields)); + let schema_report: Vec = columns + .iter() + .map(|(n, t)| format!("{}:{}", n, t)) + .collect(); + + let order_col = req.order_by.clone().unwrap_or_else(|| columns[0].0.clone()); + + let mut buf: Vec = Vec::with_capacity(1024 * 1024); + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None) + .map_err(|e| format!("ArrowWriter init: {e}"))?; + + let mut total_rows: usize = 0; + let mut batch_count: usize = 0; + let row_cap = req.limit.unwrap_or(usize::MAX); + + loop { + let remaining = row_cap.saturating_sub(total_rows); + if remaining == 0 { break; } + let fetch = remaining.min(batch_size); + + // Backticks are MySQL's identifier quote. Forbid backticks in + // table/column names to prevent injection — neither pg nor us + // should be accepting those anyway. + if req.table.contains('`') || order_col.contains('`') { + return Err("table or order_by column contains backticks — refused".into()); + } + let sql = format!( + "SELECT * FROM `{}` ORDER BY `{}` LIMIT {} OFFSET {}", + req.table, order_col, fetch, total_rows, + ); + + let rows: Vec = conn.query(&sql).await + .map_err(|e| format!("fetch batch at offset {total_rows}: {e}"))?; + + if rows.is_empty() { break; } + let n = rows.len(); + + let arrays: Vec = columns + .iter() + .enumerate() + .map(|(idx, (_, ty))| rows_to_column(&rows, idx, ty)) + .collect::>()?; + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch: {e}"))?; + writer.write(&batch).map_err(|e| format!("ArrowWriter::write: {e}"))?; + + total_rows += n; + batch_count += 1; + tracing::info!( + "mysql stream '{}': fetched batch {} ({} rows, total {})", + req.table, batch_count, n, total_rows, + ); + + if n < fetch { break; } + } + + writer.close().map_err(|e| format!("ArrowWriter::close: {e}"))?; + conn.disconnect().await.ok(); + + let result = MyStreamResult { + table: req.table.clone(), + rows: total_rows, + batches: batch_count, + columns: columns.len(), + schema: schema_report, + parquet_bytes: buf.len() as u64, + duration_secs: t0.elapsed().as_secs_f32(), + }; + + Ok((bytes::Bytes::from(buf), result)) +} + +async fn probe_columns( + conn: &mut Conn, + schema: &str, + table: &str, +) -> Result, String> { + let sql = format!( + "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns \ + WHERE table_schema = '{}' AND table_name = '{}' \ + ORDER BY ORDINAL_POSITION", + schema.replace('\'', "''"), + table.replace('\'', "''"), + ); + let rows: Vec<(String, String)> = conn.query(&sql).await + .map_err(|e| format!("probe columns: {e}"))?; + Ok(rows) +} + +/// MySQL data type string → Arrow DataType. Conservative: anything we +/// don't recognize becomes Utf8 (ADR-010). +fn mysql_type_to_arrow(ty: &str) -> DataType { + match ty.to_lowercase().as_str() { + "tinyint" | "smallint" | "mediumint" | "int" | "integer" => DataType::Int32, + "bigint" => DataType::Int64, + "float" | "double" | "decimal" | "numeric" | "real" => DataType::Float64, + "bit" | "bool" | "boolean" => DataType::Boolean, + _ => DataType::Utf8, + } +} + +/// Convert a single column slice of MySQL rows into an Arrow array. +fn rows_to_column( + rows: &[Row], + idx: usize, + ty: &str, +) -> Result { + let arrow_ty = mysql_type_to_arrow(ty); + match arrow_ty { + DataType::Boolean => { + let v: Vec> = rows.iter().map(|r| cell_as_bool(r, idx)).collect(); + Ok(Arc::new(BooleanArray::from(v))) + } + DataType::Int32 => { + let v: Vec> = rows.iter().map(|r| cell_as_i64(r, idx).map(|n| n as i32)).collect(); + Ok(Arc::new(Int32Array::from(v))) + } + DataType::Int64 => { + let v: Vec> = rows.iter().map(|r| cell_as_i64(r, idx)).collect(); + Ok(Arc::new(Int64Array::from(v))) + } + DataType::Float64 => { + let v: Vec> = rows.iter().map(|r| cell_as_f64(r, idx)).collect(); + Ok(Arc::new(Float64Array::from(v))) + } + _ => { + let v: Vec> = rows.iter().map(|r| cell_as_string(r, idx)).collect(); + Ok(Arc::new(StringArray::from(v))) + } + } +} + +fn cell(r: &Row, idx: usize) -> &Value { r.as_ref(idx).unwrap_or(&Value::NULL) } + +fn cell_as_bool(r: &Row, idx: usize) -> Option { + match cell(r, idx) { + Value::NULL => None, + Value::Int(n) => Some(*n != 0), + Value::UInt(n) => Some(*n != 0), + Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| { + match s.to_ascii_lowercase().as_str() { + "true" | "1" | "y" | "yes" => Some(true), + "false" | "0" | "n" | "no" => Some(false), + _ => None, + } + }), + _ => None, + } +} + +fn cell_as_i64(r: &Row, idx: usize) -> Option { + match cell(r, idx) { + Value::NULL => None, + Value::Int(n) => Some(*n), + Value::UInt(n) => i64::try_from(*n).ok(), + Value::Float(f) => Some(*f as i64), + Value::Double(f) => Some(*f as i64), + Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()), + _ => None, + } +} + +fn cell_as_f64(r: &Row, idx: usize) -> Option { + match cell(r, idx) { + Value::NULL => None, + Value::Int(n) => Some(*n as f64), + Value::UInt(n) => Some(*n as f64), + Value::Float(f) => Some(*f as f64), + Value::Double(f) => Some(*f), + Value::Bytes(b) => std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()), + _ => None, + } +} + +fn cell_as_string(r: &Row, idx: usize) -> Option { + match cell(r, idx) { + Value::NULL => None, + Value::Bytes(b) => Some(String::from_utf8_lossy(b).into_owned()), + Value::Int(n) => Some(n.to_string()), + Value::UInt(n) => Some(n.to_string()), + Value::Float(f) => Some(f.to_string()), + Value::Double(f) => Some(f.to_string()), + Value::Date(y, mo, d, h, mi, s, _us) => { + Some(format!("{:04}-{:02}-{:02} {:02}:{:02}:{:02}", y, mo, d, h, mi, s)) + } + Value::Time(neg, days, h, mi, s, _us) => { + let sign = if *neg { "-" } else { "" }; + Some(format!("{}{}d {:02}:{:02}:{:02}", sign, days, h, mi, s)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_dsn_full() { + let c = parse_dsn("mysql://daisy:secret@db.example.com:3307/my_db").unwrap(); + assert_eq!(c.host, "db.example.com"); + assert_eq!(c.port, 3307); + assert_eq!(c.user, "daisy"); + assert_eq!(c.password, "secret"); + assert_eq!(c.database, "my_db"); + } + + #[test] + fn parse_dsn_default_port() { + let c = parse_dsn("mysql://root@localhost/shop").unwrap(); + assert_eq!(c.port, 3306); + assert_eq!(c.user, "root"); + assert_eq!(c.password, ""); + } + + #[test] + fn parse_dsn_no_auth() { + let c = parse_dsn("mysql://127.0.0.1:3306/analytics").unwrap(); + assert_eq!(c.user, "root"); + assert_eq!(c.host, "127.0.0.1"); + } + + #[test] + fn parse_dsn_rejects_non_mysql() { + assert!(parse_dsn("postgresql://host/db").is_err()); + } + + #[test] + fn parse_dsn_requires_db() { + assert!(parse_dsn("mysql://localhost:3306").is_err()); + } + + #[test] + fn type_map_basics() { + assert_eq!(mysql_type_to_arrow("int"), DataType::Int32); + assert_eq!(mysql_type_to_arrow("BIGINT"), DataType::Int64); + assert_eq!(mysql_type_to_arrow("decimal"), DataType::Float64); + assert_eq!(mysql_type_to_arrow("varchar"), DataType::Utf8); + assert_eq!(mysql_type_to_arrow("json"), DataType::Utf8); + assert_eq!(mysql_type_to_arrow("bool"), DataType::Boolean); + } +} diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs index 3dcef67..90a8c80 100644 --- a/crates/ingestd/src/service.rs +++ b/crates/ingestd/src/service.rs @@ -11,7 +11,7 @@ use serde::Deserialize; use std::sync::Arc; use catalogd::registry::Registry; -use crate::{db_ingest, pg_stream, pipeline}; +use crate::{db_ingest, my_stream, pg_stream, pipeline}; use shared::arrow_helpers::record_batch_to_parquet; use shared::types::{ObjectRef, SchemaFingerprint}; use storaged::ops; @@ -23,6 +23,36 @@ pub struct IngestState { pub registry: Registry, /// Federation layer 2: lookup target bucket from request headers. pub buckets: Arc, + /// Phase 16.5: when ingest marks a dataset's embeddings stale, push + /// a `DatasetAppended` trigger so the autotune agent can schedule + /// re-trials. Holds the agent handle (no-op when agent disabled). + pub agent_handle: vectord::agent::AgentHandle, + /// Used to look up which HNSW indexes are attached to the + /// just-ingested dataset. Each matching index gets one trigger. + pub index_registry: vectord::index_registry::IndexRegistry, +} + +/// Push `DatasetAppended` triggers for every HNSW index bound to this +/// dataset. Called after a successful ingest. Logs failures but never +/// fails the ingest — the agent is best-effort, not load-bearing. +async fn notify_agent_on_append(state: &IngestState, dataset_name: &str) { + let indexes = state.index_registry.list(Some(dataset_name), None).await; + if indexes.is_empty() { + return; + } + for meta in indexes { + let event = vectord::agent::TriggerEvent::dataset_appended( + meta.index_name.clone(), dataset_name, + ); + if let Err(e) = state.agent_handle.enqueue(event).await { + tracing::warn!("agent enqueue failed for '{}': {}", meta.index_name, e); + } else { + tracing::info!( + "agent: enqueued DatasetAppended({}) for index '{}'", + dataset_name, meta.index_name, + ); + } + } } /// Resolve the target bucket from `X-Lakehouse-Bucket` header. @@ -55,6 +85,7 @@ pub fn router(state: IngestState) -> Router { .route("/postgres/tables", post(list_pg_tables)) .route("/postgres/import", post(import_pg_table)) .route("/db", post(ingest_db_stream)) + .route("/mysql", post(ingest_mysql_stream)) .with_state(state) } @@ -94,6 +125,9 @@ async fn ingest_file( match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await { Ok(result) => { + if !result.deduplicated { + notify_agent_on_append(&state, &result.dataset_name).await; + } if result.deduplicated { Ok((StatusCode::OK, Json(result))) } else { @@ -309,6 +343,9 @@ async fn ingest_db_stream( // index. No-op for newly-created datasets. let _ = state.registry.mark_embeddings_stale(&dataset_name).await; + // Phase 16.5: notify autotune agent. No-op if no indexes attached. + notify_agent_on_append(&state, &dataset_name).await; + Ok((StatusCode::CREATED, Json(serde_json::json!({ "dataset_name": dataset_name, "table": stream_result.table, @@ -322,17 +359,119 @@ async fn ingest_db_stream( })))) } -/// Redact the password in a postgresql:// DSN for logging / lineage. -fn redact_dsn(dsn: &str) -> String { - // Simple approach: find @...: and replace password between : and @. - if let Some(at_idx) = dsn.rfind('@') { - let prefix = &dsn[..at_idx]; - if let Some(colon_idx) = prefix.rfind(':') { - // But only if the colon is after the scheme colons (postgresql://) - if colon_idx > "postgresql://".len() { - return format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]); - } - } +/// Streaming MySQL ingest. Mirrors `ingest_db_stream` — same pagination + +/// Parquet-footer schema recovery + PII/lineage metadata. Different driver, +/// otherwise identical. +async fn ingest_mysql_stream( + State(state): State, + headers: HeaderMap, + Json(req): Json, +) -> Result<(StatusCode, Json), (StatusCode, String)> { + let (bucket, store) = resolve_bucket(&headers, &state.buckets)?; + tracing::info!( + "mysql stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}", + req.table, req.dataset_name, bucket, req.batch_size, + ); + + let (parquet, stream_result) = my_stream::stream_table_to_parquet(&req) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, e))?; + + if stream_result.rows == 0 { + return Ok((StatusCode::OK, Json(serde_json::json!({ + "table": req.table, + "rows": 0, + "message": "table is empty", + })))); } - dsn.to_string() + + let (schema, _) = shared::arrow_helpers::parquet_to_record_batches(&parquet) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("reparse parquet: {e}")))?; + + let dataset_name = req.dataset_name.clone().unwrap_or_else(|| req.table.clone()); + let storage_key = format!("datasets/{}.parquet", dataset_name); + let size_bytes = parquet.len() as u64; + + ops::put(&store, &storage_key, parquet) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + let schema_fp = shared::arrow_helpers::fingerprint_schema(&schema); + let now = chrono::Utc::now(); + state.registry.register( + dataset_name.clone(), + SchemaFingerprint(schema_fp.0), + vec![ObjectRef { + bucket: bucket.clone(), + key: storage_key.clone(), + size_bytes, + created_at: now, + }], + ).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + let col_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + let sensitivity = shared::pii::detect_dataset_sensitivity(&col_names); + let columns: Vec = schema.fields().iter().map(|f| { + let sens = shared::pii::detect_sensitivity(f.name()); + shared::types::ColumnMeta { + name: f.name().clone(), + data_type: f.data_type().to_string(), + sensitivity: sens.clone(), + description: String::new(), + is_pii: matches!(sens, Some(shared::types::Sensitivity::Pii)), + } + }).collect(); + + let lineage = shared::types::Lineage { + source_system: "mysql".to_string(), + source_file: format!("dsn: {}", redact_dsn(&req.dsn)), + ingest_job: format!("mysql-stream-{}", now.timestamp_millis()), + ingest_timestamp: now, + parent_datasets: vec![], + }; + + let _ = state.registry.update_metadata(&dataset_name, catalogd::registry::MetadataUpdate { + sensitivity, + columns: Some(columns), + lineage: Some(lineage), + row_count: Some(stream_result.rows as u64), + ..Default::default() + }).await; + + // Phase C: mark stale + Phase 16.5: notify agent. + let _ = state.registry.mark_embeddings_stale(&dataset_name).await; + notify_agent_on_append(&state, &dataset_name).await; + + Ok((StatusCode::CREATED, Json(serde_json::json!({ + "dataset_name": dataset_name, + "table": stream_result.table, + "rows": stream_result.rows, + "batches": stream_result.batches, + "columns": stream_result.columns, + "schema": stream_result.schema, + "storage_key": storage_key, + "size_bytes": size_bytes, + "duration_secs": stream_result.duration_secs, + })))) +} + +/// Redact the password in any `scheme://user:pass@host/...` DSN. Works +/// for postgres, mysql, and any other scheme using the same shape. +fn redact_dsn(dsn: &str) -> String { + let scheme_end = match dsn.find("://") { + Some(i) => i + 3, + None => return dsn.to_string(), + }; + let at_idx = match dsn.rfind('@') { + Some(i) if i > scheme_end => i, + _ => return dsn.to_string(), + }; + // Find the password separator (first `:` after scheme, before `@`). + let userpass = &dsn[scheme_end..at_idx]; + let colon_offset = match userpass.find(':') { + Some(i) => i, + None => return dsn.to_string(), // no password in DSN + }; + let colon_idx = scheme_end + colon_offset; + format!("{}:***{}", &dsn[..colon_idx], &dsn[at_idx..]) } diff --git a/crates/shared/src/config.rs b/crates/shared/src/config.rs index 6cfd325..51d5167 100644 --- a/crates/shared/src/config.rs +++ b/crates/shared/src/config.rs @@ -16,8 +16,46 @@ pub struct Config { pub auth: AuthConfig, #[serde(default)] pub observability: ObservabilityConfig, + #[serde(default)] + pub agent: AgentSettings, } +/// Phase 16.2 — background autotune agent settings. +/// +/// Duplicated from `vectord::agent::AgentConfig` because `shared` can't +/// depend on `vectord` (vectord already depends on shared). The gateway +/// copies these into the vectord config at startup. +#[derive(Debug, Clone, Deserialize)] +pub struct AgentSettings { + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_cycle_interval_secs")] + pub cycle_interval_secs: u64, + #[serde(default = "default_cooldown_secs")] + pub cooldown_between_trials_secs: u64, + #[serde(default = "default_min_recall")] + pub min_recall: f32, + #[serde(default = "default_max_trials_per_hour")] + pub max_trials_per_hour: u32, +} + +impl Default for AgentSettings { + fn default() -> Self { + Self { + enabled: false, + cycle_interval_secs: default_cycle_interval_secs(), + cooldown_between_trials_secs: default_cooldown_secs(), + min_recall: default_min_recall(), + max_trials_per_hour: default_max_trials_per_hour(), + } + } +} + +fn default_cycle_interval_secs() -> u64 { 60 } +fn default_cooldown_secs() -> u64 { 30 } +fn default_min_recall() -> f32 { 0.9 } +fn default_max_trials_per_hour() -> u32 { 30 } + #[derive(Debug, Clone, Deserialize)] pub struct GatewayConfig { #[serde(default = "default_host")] @@ -159,6 +197,7 @@ impl Default for Config { ai: AiConfig::default(), auth: AuthConfig::default(), observability: ObservabilityConfig::default(), + agent: AgentSettings::default(), } } } diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index 9195ee3..ba46012 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -255,10 +255,55 @@ pub struct ModelProfile { pub created_at: chrono::DateTime, #[serde(default)] pub created_by: String, + /// Federation layer 2: which bucket this profile's artifacts + /// (trial journals, promotions for its indexes) live in. If set and + /// not yet registered, `POST /vectors/profile/{id}/activate` auto- + /// provisions a local bucket under `storage.profile_root` and + /// registers it before warming indexes. `None` = share the primary + /// bucket (pre-federation behavior). + #[serde(default)] + pub bucket: Option, + /// ADR-019 hybrid: which storage backend serves this profile's + /// indexes. Default Parquet so pre-existing profiles unchanged. + /// New profiles can opt into Lance for append-heavy / random-access + /// / 5M+ vector workloads. + #[serde(default)] + pub vector_backend: VectorBackend, } fn default_embed_model() -> String { "nomic-embed-text".to_string() } +/// Per-profile / per-index storage backend choice. +/// +/// `Parquet` (default) — vectors as binary-blob in Parquet + in-RAM HNSW +/// for ANN. Best when: +/// - Index ≤ ~5M vectors (fits in RAM) +/// - Search latency dominates (HNSW p50 < 1ms at 100K) +/// - Workload is mostly read, not append +/// +/// `Lance` — vectors as native FixedSizeList in a Lance dataset, IVF_PQ +/// on-disk index. Best when: +/// - Index will exceed RAM ceiling (5M+ vectors per index) +/// - Heavy random-row access for RAG (Lance: 311us; Parquet: ~35ms full +/// scan per fetch) +/// - Append-heavy ingest (Lance: 0.08s for 10K rows; Parquet: full +/// ~330MB rewrite per change) +/// - Need to rebuild index frequently (Lance IVF_PQ: 16s; HNSW: 230s +/// at the same scale) +/// +/// Defaults to Parquet for backward compat with pre-ADR-019 indexes. +/// See `docs/ADR-019-vector-storage.md` for the full scorecard. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum VectorBackend { + Parquet, + Lance, +} + +impl Default for VectorBackend { + fn default() -> Self { VectorBackend::Parquet } +} + /// Soft-delete marker (Phase E). /// /// Tombstones live beside the dataset in `_catalog/tombstones/{dataset}/` diff --git a/crates/storaged/src/federation_service.rs b/crates/storaged/src/federation_service.rs index 08febf7..cece0a2 100644 --- a/crates/storaged/src/federation_service.rs +++ b/crates/storaged/src/federation_service.rs @@ -13,10 +13,11 @@ use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, - routing::{get, put}, + routing::{delete, get, post, put}, }; use chrono::{DateTime, Utc}; use serde::Deserialize; +use shared::config::BucketConfig; use std::sync::Arc; use crate::error_journal::BucketErrorEvent; @@ -24,16 +25,68 @@ use crate::registry::{BucketInfo, BucketRegistry}; pub fn router(registry: Arc) -> Router { Router::new() - .route("/buckets", get(list_buckets)) + .route("/buckets", get(list_buckets).post(add_bucket)) + .route("/buckets/{name}", delete(remove_bucket)) .route("/errors", get(list_errors)) - .route("/errors/flush", axum::routing::post(flush_errors)) - .route("/errors/compact", axum::routing::post(compact_errors)) + .route("/errors/flush", post(flush_errors)) + .route("/errors/compact", post(compact_errors)) .route("/bucket-health", get(get_health)) .route("/buckets/{bucket}/objects/{*key}", put(put_bucket_object)) .route("/buckets/{bucket}/objects/{*key}", get(get_bucket_object)) .with_state(registry) } +/// Provision + register a bucket at runtime. Body is a `BucketConfig`. +/// +/// Federation layer 2: profile buckets (`profile:alice`) and tenant +/// buckets can be added after startup without a service restart. +async fn add_bucket( + State(reg): State>, + Json(bc): Json, +) -> impl IntoResponse { + // Safety net: local backends must land somewhere under the configured + // profile_root (for profile:*) or be otherwise explicitly rooted. + // Refuse empty / missing root to avoid "bucket created at ./" surprises. + if bc.backend == "local" { + match bc.root.as_deref() { + Some(r) if !r.trim().is_empty() => {} + _ => return Err((StatusCode::BAD_REQUEST, + "local bucket requires a non-empty 'root' path".to_string())), + } + } + match reg.add_bucket(bc).await { + Ok(info) => Ok((StatusCode::CREATED, Json(info))), + Err(e) => { + let code = if e.contains("already registered") { + StatusCode::CONFLICT + } else { + StatusCode::BAD_REQUEST + }; + Err((code, e)) + } + } +} + +/// Unregister a bucket. Refused for primary / rescue / unknown names. +async fn remove_bucket( + State(reg): State>, + Path(name): Path, +) -> impl IntoResponse { + match reg.remove_bucket(&name) { + Ok(()) => Ok((StatusCode::OK, format!("unregistered: {name}"))), + Err(e) => { + let code = if e.contains("cannot remove") { + StatusCode::FORBIDDEN + } else if e.contains("not registered") { + StatusCode::NOT_FOUND + } else { + StatusCode::BAD_REQUEST + }; + Err((code, e)) + } + } +} + async fn list_buckets(State(reg): State>) -> Json> { Json(reg.list().await) } diff --git a/crates/storaged/src/registry.rs b/crates/storaged/src/registry.rs index e762cb8..36f3b32 100644 --- a/crates/storaged/src/registry.rs +++ b/crates/storaged/src/registry.rs @@ -15,7 +15,7 @@ use serde::Serialize; use shared::config::{BucketConfig, StorageConfig}; use shared::secrets::{BucketCredentials, SecretsProvider}; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use crate::error_journal::{BucketErrorEvent, ErrorJournal}; @@ -55,10 +55,16 @@ pub enum BucketRole { } pub struct BucketRegistry { - buckets: HashMap>, + /// RwLock because federation layer 2 adds runtime bucket lifecycle + /// (`POST /storage/buckets` / `DELETE /storage/buckets/{name}`). + /// Almost all accesses are reads — writes happen at provision / + /// deactivate time only. + buckets: RwLock>>, default: String, rescue: Option, profile_root: String, + /// Held so runtime `add_bucket` can resolve secret_ref handles. + secrets: Arc, journal: ErrorJournal, } @@ -113,10 +119,11 @@ impl BucketRegistry { let _ = journal.load_recent().await; Ok(Self { - buckets, + buckets: RwLock::new(buckets), default: "primary".to_string(), rescue: cfg.rescue_bucket.clone(), profile_root: cfg.profile_root.clone(), + secrets, journal, }) } @@ -125,12 +132,19 @@ impl BucketRegistry { pub fn rescue_name(&self) -> Option<&str> { self.rescue.as_deref() } + pub fn profile_root(&self) -> &str { &self.profile_root } + pub fn journal(&self) -> &ErrorJournal { &self.journal } /// Resolve a bucket name to its object store. Existing call sites use /// this as a drop-in replacement for the old single-store pattern. + /// + /// Uses `std::sync::RwLock` — every caller holds the guard for + /// microseconds (clones an Arc and drops), so there's no async- + /// blocking concern. Never hold across an await. pub fn get(&self, bucket: &str) -> Result, String> { - self.buckets + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard .get(bucket) .map(|e| e.store.clone()) .ok_or_else(|| format!("unknown bucket: {bucket}")) @@ -139,22 +153,34 @@ impl BucketRegistry { /// The default bucket's store — use for code paths that don't yet know /// about buckets. pub fn default_store(&self) -> Arc { - self.buckets.get(&self.default).unwrap().store.clone() + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard.get(&self.default).unwrap().store.clone() + } + + /// True if this bucket name is registered. + pub fn contains(&self, bucket: &str) -> bool { + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard.contains_key(bucket) } /// List all registered buckets. Checks reachability by doing a trivial /// `list` with limit 1 on each. pub async fn list(&self) -> Vec { - let mut out = Vec::with_capacity(self.buckets.len()); - for (name, entry) in &self.buckets { - let reachable = probe(&entry.store).await; - let role = self.classify(name); - out.push(BucketInfo { - name: name.clone(), - backend: entry.backend.clone(), - reachable, - role, - }); + // Snapshot (name, backend, store_clone) under the lock, then probe + // outside — probing can be slow (network), don't hold the lock. + let snapshot: Vec<(String, String, Arc)> = { + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard + .iter() + .map(|(n, e)| (n.clone(), e.backend.clone(), e.store.clone())) + .collect() + }; + + let mut out = Vec::with_capacity(snapshot.len()); + for (name, backend, store) in snapshot { + let reachable = probe(&store).await; + let role = self.classify(&name); + out.push(BucketInfo { name, backend, reachable, role }); } out.sort_by(|a, b| a.name.cmp(&b.name)); out @@ -167,14 +193,69 @@ impl BucketRegistry { else { BucketRole::Tenant } } + /// Provision + register a bucket at runtime (federation layer 2). + /// Returns Err if the name is already registered. Local backends get + /// their root directory created automatically. + pub async fn add_bucket(&self, bc: BucketConfig) -> Result { + if self.contains(&bc.name) { + return Err(format!("bucket '{}' already registered", bc.name)); + } + let store = build_store(&bc, self.secrets.as_ref()).await?; + let reachable = probe(&store).await; + let entry = Arc::new(BucketEntry { + name: bc.name.clone(), + backend: bc.backend.clone(), + store, + config: bc.clone(), + }); + let role = self.classify(&bc.name); + let info = BucketInfo { + name: bc.name.clone(), + backend: bc.backend.clone(), + reachable, + role, + }; + { + let mut guard = self.buckets.write().expect("bucket registry lock poisoned"); + guard.insert(bc.name.clone(), entry); + } + tracing::info!("registered bucket '{}' backend={} reachable={}", + bc.name, bc.backend, reachable); + Ok(info) + } + + /// Unregister a bucket. Refuses to remove `primary` or the configured + /// rescue bucket — those are load-bearing. Caller is responsible for + /// any final flush of bucket-local state before calling this. + pub fn remove_bucket(&self, name: &str) -> Result<(), String> { + if name == self.default { + return Err("cannot remove primary bucket".into()); + } + if Some(name) == self.rescue.as_deref() { + return Err(format!("cannot remove rescue bucket '{name}'")); + } + let removed = { + let mut guard = self.buckets.write().expect("bucket registry lock poisoned"); + guard.remove(name) + }; + if removed.is_none() { + return Err(format!("bucket '{name}' not registered")); + } + tracing::info!("unregistered bucket '{}'", name); + Ok(()) + } + /// Read with rescue-bucket fallback. If the target bucket fails and a /// rescue is configured, retries against rescue. Records every failure /// in the error journal. pub async fn read_smart(&self, bucket: &str, key: &str) -> Result { - let target = self.buckets.get(bucket) - .ok_or_else(|| format!("unknown bucket: {bucket}"))?; + let target_store = { + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard.get(bucket).map(|e| e.store.clone()) + .ok_or_else(|| format!("unknown bucket: {bucket}"))? + }; - match crate::ops::get(&target.store, key).await { + match crate::ops::get(&target_store, key).await { Ok(data) => Ok(ReadOutcome { data, rescued: false, original_bucket: bucket.to_string(), @@ -187,8 +268,12 @@ impl BucketRegistry { // Try rescue, if any. if let Some(rescue_name) = &self.rescue { if rescue_name != bucket { - if let Some(rescue) = self.buckets.get(rescue_name) { - match crate::ops::get(&rescue.store, key).await { + let rescue_store = { + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard.get(rescue_name).map(|e| e.store.clone()) + }; + if let Some(rescue_store) = rescue_store { + match crate::ops::get(&rescue_store, key).await { Ok(data) => { self.journal.mark_rescued_last(bucket, key).await; return Ok(ReadOutcome { @@ -219,9 +304,12 @@ impl BucketRegistry { key: &str, data: bytes::Bytes, ) -> Result<(), String> { - let target = self.buckets.get(bucket) - .ok_or_else(|| format!("unknown bucket: {bucket}"))?; - match crate::ops::put(&target.store, key, data).await { + let target_store = { + let guard = self.buckets.read().expect("bucket registry lock poisoned"); + guard.get(bucket).map(|e| e.store.clone()) + .ok_or_else(|| format!("unknown bucket: {bucket}"))? + }; + match crate::ops::put(&target_store, key, data).await { Ok(()) => Ok(()), Err(err) => { self.journal.append(BucketErrorEvent::new_write(bucket, key, &err)).await; diff --git a/crates/vectord-lance/Cargo.toml b/crates/vectord-lance/Cargo.toml new file mode 100644 index 0000000..bdeed1f --- /dev/null +++ b/crates/vectord-lance/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "vectord-lance" +version = "0.1.0" +edition = "2024" + +# Firewall crate: the Lance stack (Arrow 57, DataFusion 52) is isolated +# from the rest of vectord (Arrow 55, DataFusion 47). Public API uses +# only std types — no Arrow types cross the crate boundary — so no +# version conflict propagates outward. +# +# See docs/ADR-019-vector-storage.md for the rationale: "productionizing +# will need either workspace-wide upgrade or a firewall via a dedicated +# vectord-lance crate." This is that firewall. + +[dependencies] +lance = { version = "4.0", default-features = false } +lance-index = { version = "4.0", default-features = false } +lance-linalg = { version = "4.0", default-features = false } + +# These Arrow/Parquet versions MUST match Lance's expectations — Lance +# re-exports their types through its API so any mismatch is a hard +# compile error. Keep in sync with lance-bench. +arrow = "57" +arrow-array = "57" +arrow-schema = "57" +parquet = "57" + +tokio = { version = "1", features = ["full"] } +futures = "0.3" +serde = { version = "1", features = ["derive"] } +bytes = "1" diff --git a/crates/vectord-lance/src/lib.rs b/crates/vectord-lance/src/lib.rs new file mode 100644 index 0000000..68c23f5 --- /dev/null +++ b/crates/vectord-lance/src/lib.rs @@ -0,0 +1,528 @@ +//! Production Lance vector backend (ADR-019 — hybrid architecture). +//! +//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52 +//! / Lance 4 dependency tree. The public API uses only std types +//! (`Vec`, `Vec`, `String`, `bool`) so nothing Arrow-shaped +//! crosses the crate boundary. That keeps vectord (Arrow 55) from +//! picking up an incompatible dep. +//! +//! Responsibilities: +//! - Migrate an existing binary-blob Parquet vector file into a Lance +//! dataset with `FixedSizeList`. One-time cost. +//! - Append new rows natively (no full-file rewrite — Lance's structural win). +//! - Build an IVF_PQ ANN index on the vector column. +//! - Vector search (`search`) using the IVF_PQ index when present, +//! falling back to full scan otherwise. +//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1) +//! lookup that Parquet-on-object-store can't cheaply do. +//! - Cheap count + stats introspection. + +use arrow_array::{ + Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array, + RecordBatch, RecordBatchIterator, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use futures::StreamExt; +use serde::Serialize; +use std::sync::Arc; +use std::time::Instant; + +// ================= Public types ================= + +/// One search result. Mirrors vectord's existing `SearchResult` shape +/// structurally but carries simpler types so this crate stays firewalled. +#[derive(Debug, Clone, Serialize)] +pub struct Hit { + pub doc_id: String, + pub chunk_text: String, + pub score: f32, + /// Optional — set by search_with_vector, not by index-only search. + pub distance: Option, +} + +/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector +/// so callers can do downstream work (rerank, cite, etc.) without a +/// second round trip. +#[derive(Debug, Clone, Serialize)] +pub struct Row { + pub doc_id: String, + pub chunk_text: String, + pub vector: Vec, + pub source: Option, + pub chunk_idx: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct MigrationStats { + pub rows_written: usize, + pub dimensions: usize, + pub disk_bytes: u64, + pub duration_secs: f32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct AppendStats { + pub rows_appended: usize, + pub disk_bytes_added: u64, + pub duration_secs: f32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct IndexStats { + pub name: String, + pub num_partitions: u32, + pub num_bits: u32, + pub num_sub_vectors: u32, + pub build_time_secs: f32, + pub disk_bytes_added: u64, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DatasetStats { + pub path: String, + pub rows: usize, + pub disk_bytes: u64, + pub has_vector_index: bool, +} + +// ================= The backend ================= + +/// Thin wrapper around a Lance dataset path. Lance handles the heavy +/// lifting — we just expose a narrow API. +#[derive(Clone)] +pub struct LanceVectorStore { + /// Local filesystem path or object-store URI (file:///..., s3://...). + /// Lance's internal URI parsing handles both. + path: String, +} + +impl LanceVectorStore { + pub fn new(path: impl Into) -> Self { + Self { path: path.into() } + } + + pub fn path(&self) -> &str { &self.path } + + /// Row count via Lance's fast metadata path (no scan). + pub async fn count(&self) -> Result { + let dataset = open_or_err(&self.path).await?; + let n = dataset.count_rows(None).await.map_err(e)?; + Ok(n) + } + + /// True if the on-disk dataset exists AND has at least one `vector` + /// column index attached. + pub async fn has_vector_index(&self) -> Result { + use lance_index::DatasetIndexExt; + let dataset = match lance::dataset::Dataset::open(&self.path).await { + Ok(d) => d, + Err(_) => return Ok(false), + }; + let indexes = dataset.load_indices().await.map_err(e)?; + Ok(indexes.iter().any(|ix| { + ix.fields.iter().any(|fid| { + dataset.schema().field_by_id(*fid) + .map(|f| f.name == "vector") + .unwrap_or(false) + }) + })) + } + + pub async fn stats(&self) -> Result { + let rows = self.count().await.unwrap_or(0); + let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path)); + let has_vector_index = self.has_vector_index().await.unwrap_or(false); + Ok(DatasetStats { + path: self.path.clone(), + rows, + disk_bytes, + has_vector_index, + }) + } + + /// Migrate a vectord-format Parquet file into a Lance dataset. + /// + /// Input schema (vectord's binary-blob format): + /// - source : Utf8 + /// - doc_id : Utf8 + /// - chunk_idx : Int32 + /// - chunk_text : Utf8 + /// - vector : Binary (raw f32 little-endian bytes) + /// + /// Output schema (Lance-friendly): + /// - source : Utf8 + /// - doc_id : Utf8 + /// - chunk_idx : Int32 + /// - chunk_text : Utf8 + /// - vector : FixedSizeList + /// + /// Idempotent at the file level — if the target exists, it's + /// overwritten. Caller must manage destination paths. + pub async fn migrate_from_parquet_bytes( + &self, + parquet_bytes: &[u8], + ) -> Result { + let t0 = Instant::now(); + let (schema, batches, rows) = read_parquet(parquet_bytes)?; + let dims = detect_vector_dims(&batches)?; + let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?; + + // Overwrite any prior dataset at this path. + let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path)); + + write_dataset(&self.path, new_schema, new_batches).await?; + let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path)); + + Ok(MigrationStats { + rows_written: rows, + dimensions: dims, + disk_bytes, + duration_secs: t0.elapsed().as_secs_f32(), + }) + } + + /// Native Lance append — does NOT rewrite existing files. New rows + /// land as a separate fragment; readers union across fragments at + /// query time. Contrast: our Parquet path requires rewriting the + /// entire vector file to add rows. + pub async fn append( + &self, + source: Option, + doc_ids: Vec, + chunk_idxs: Vec, + chunk_texts: Vec, + vectors: Vec>, + ) -> Result { + let n = doc_ids.len(); + if n == 0 { + return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 }); + } + if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n { + return Err(format!( + "append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})", + chunk_idxs.len(), chunk_texts.len(), vectors.len(), + )); + } + let dims = vectors[0].len(); + for (i, v) in vectors.iter().enumerate() { + if v.len() != dims { + return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims)); + } + } + + let t0 = Instant::now(); + let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path)); + + let src_arr = StringArray::from( + (0..n).map(|_| source.clone()).collect::>() + ); + let doc_id_arr = StringArray::from(doc_ids); + let chunk_idx_arr = Int32Array::from(chunk_idxs); + let chunk_text_arr = StringArray::from(chunk_texts); + + let mut flat: Vec = Vec::with_capacity(n * dims); + for v in vectors { flat.extend(v); } + let values = Float32Array::from(flat); + let item_field = Arc::new(Field::new("item", DataType::Float32, true)); + let vec_arr = FixedSizeListArray::try_new( + item_field.clone(), dims as i32, Arc::new(values), None, + ).map_err(e)?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("source", DataType::Utf8, true), + Field::new("doc_id", DataType::Utf8, false), + Field::new("chunk_idx", DataType::Int32, true), + Field::new("chunk_text", DataType::Utf8, true), + Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false), + ])); + + let arrays: Vec = vec![ + Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr), + Arc::new(chunk_text_arr), Arc::new(vec_arr), + ]; + let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?; + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema); + + use lance::dataset::{Dataset, WriteMode, WriteParams}; + let params = WriteParams { mode: WriteMode::Append, ..Default::default() }; + Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?; + + Ok(AppendStats { + rows_appended: n, + disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes), + duration_secs: t0.elapsed().as_secs_f32(), + }) + } + + /// Build an IVF_PQ vector index. Replaces any prior index with the + /// same name. Callers pass explicit params — sensible defaults for + /// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48. + pub async fn build_index( + &self, + num_partitions: u32, + num_bits: u32, + num_sub_vectors: u32, + ) -> Result { + use lance::dataset::Dataset; + use lance::index::vector::VectorIndexParams; + use lance_index::{DatasetIndexExt, IndexType}; + use lance_linalg::distance::MetricType; + + let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path)); + let t0 = Instant::now(); + + let mut dataset = Dataset::open(&self.path).await.map_err(e)?; + let params = VectorIndexParams::ivf_pq( + num_partitions as usize, + num_bits as u8, + num_sub_vectors as usize, + MetricType::Cosine, + 50, // max_iterations — same as bench + ); + dataset.create_index( + &["vector"], + IndexType::Vector, + Some("vec_idx".into()), + ¶ms, + true, // replace + ).await.map_err(e)?; + + Ok(IndexStats { + name: "vec_idx".into(), + num_partitions, + num_bits, + num_sub_vectors, + build_time_secs: t0.elapsed().as_secs_f32(), + disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes), + }) + } + + /// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ + /// index if one exists; otherwise does a full scan (slow but + /// correct — useful during development before index build). + pub async fn search(&self, query: &[f32], top_k: usize) -> Result, String> { + use lance::dataset::Dataset; + + let dataset = Dataset::open(&self.path).await.map_err(e)?; + let qarr = Float32Array::from(query.to_vec()); + + let mut scanner = dataset.scan(); + scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?; + scanner.project(&["doc_id", "chunk_text"]).map_err(e)?; + + let mut stream = scanner.try_into_stream().await.map_err(e)?; + let mut hits: Vec = Vec::with_capacity(top_k); + + while let Some(batch) = stream.next().await { + let batch = batch.map_err(e)?; + let doc_ids = batch.column_by_name("doc_id") + .ok_or_else(|| "no doc_id column in search result".to_string())? + .as_any().downcast_ref::() + .ok_or_else(|| "doc_id is not StringArray".to_string())?; + let chunk_texts = batch.column_by_name("chunk_text") + .and_then(|c| c.as_any().downcast_ref::()); + // Lance tacks on a `_distance` column for nearest() queries. + let distances = batch.column_by_name("_distance") + .and_then(|c| c.as_any().downcast_ref::()); + for row in 0..batch.num_rows() { + let d = distances.map(|a| a.value(row)); + hits.push(Hit { + doc_id: doc_ids.value(row).to_string(), + chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(), + score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity + distance: d, + }); + } + } + Ok(hits) + } + + /// Fetch one row by doc_id. Implementation: Lance filter-pushdown + /// scan — O(1) with partition pruning on a proper btree index, + /// O(N) on vector-only datasets (still far faster than reading + /// the whole Parquet file). We don't build a scalar index on + /// doc_id yet; that's a future optimization. + pub async fn get_by_doc_id(&self, doc_id: &str) -> Result, String> { + use lance::dataset::Dataset; + + let dataset = Dataset::open(&self.path).await.map_err(e)?; + let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''")); + let mut scanner = dataset.scan(); + scanner.filter(&filter).map_err(e)?; + scanner.limit(Some(1), None).map_err(e)?; + let mut stream = scanner.try_into_stream().await.map_err(e)?; + + while let Some(batch) = stream.next().await { + let batch = batch.map_err(e)?; + if batch.num_rows() == 0 { continue; } + return Ok(Some(row_from_batch(&batch, 0)?)); + } + Ok(None) + } +} + +// ================= Internal helpers ================= + +fn e(err: T) -> String { err.to_string() } + +/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for +/// helpers that only work on local paths (dir_size_bytes, remove_dir_all). +fn strip_file_uri(uri: &str) -> String { + uri.strip_prefix("file://").unwrap_or(uri).to_string() +} + +async fn open_or_err(path: &str) -> Result { + lance::dataset::Dataset::open(path).await.map_err(e) +} + +fn dir_size_bytes(path: &str) -> u64 { + fn recurse(p: &std::path::Path) -> u64 { + let Ok(meta) = std::fs::metadata(p) else { return 0; }; + if meta.is_file() { return meta.len(); } + let Ok(entries) = std::fs::read_dir(p) else { return 0; }; + entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum() + } + recurse(std::path::Path::new(path)) +} + +fn read_parquet(bytes: &[u8]) -> Result<(Arc, Vec, usize), String> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes)) + .map_err(e)?; + let schema = builder.schema().clone(); + let reader = builder.build().map_err(e)?; + let batches: Vec = reader.collect::>().map_err(e)?; + let rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + Ok((schema, batches, rows)) +} + +fn detect_vector_dims(batches: &[RecordBatch]) -> Result { + for batch in batches { + let idx = batch.schema().index_of("vector") + .map_err(|_| "no 'vector' column".to_string())?; + let col = batch.column(idx); + if let Some(binary) = col.as_any().downcast_ref::() { + for i in 0..binary.len() { + if !binary.is_null(i) { + return Ok(binary.value(i).len() / 4); + } + } + } else if let Some(fsl) = col.as_any().downcast_ref::() { + return Ok(fsl.value_length() as usize); + } + } + Err("could not determine vector dimensions".into()) +} + +fn convert_to_fixed_size_list( + schema: &Arc, + batches: Vec, + dims: usize, +) -> Result<(Arc, Vec), String> { + let new_fields: Vec> = schema + .fields() + .iter() + .map(|f| { + if f.name() == "vector" { + Arc::new(Field::new( + "vector", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dims as i32, + ), + false, + )) + } else { + f.clone() + } + }) + .collect(); + let new_schema = Arc::new(Schema::new(new_fields)); + + let mut out = Vec::with_capacity(batches.len()); + for batch in batches { + let vec_idx = batch.schema().index_of("vector").map_err(e)?; + let mut new_cols: Vec = Vec::with_capacity(batch.num_columns()); + for (i, col) in batch.columns().iter().enumerate() { + if i == vec_idx { + if let Some(bin) = col.as_any().downcast_ref::() { + new_cols.push(Arc::new(binary_to_fsl(bin, dims)?)); + } else if col.as_any().is::() { + // Already in the right shape — just clone. + new_cols.push(col.clone()); + } else { + return Err("vector column is neither Binary nor FixedSizeList".into()); + } + } else { + new_cols.push(col.clone()); + } + } + out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?); + } + Ok((new_schema, out)) +} + +fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result { + let n = bin.len(); + let mut flat: Vec = Vec::with_capacity(n * dims); + for i in 0..n { + if bin.is_null(i) { + flat.extend(std::iter::repeat(0.0).take(dims)); + continue; + } + let b = bin.value(i); + if b.len() != dims * 4 { + return Err(format!( + "row {i}: {} bytes vs expected {} ({} × f32)", + b.len(), dims * 4, dims, + )); + } + for c in b.chunks_exact(4) { + flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]])); + } + } + let values = Float32Array::from(flat); + let field = Arc::new(Field::new("item", DataType::Float32, true)); + FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e) +} + +async fn write_dataset( + path: &str, + schema: Arc, + batches: Vec, +) -> Result<(), String> { + use lance::dataset::{Dataset, WriteParams}; + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + Dataset::write(reader, path, Some(WriteParams::default())) + .await.map_err(e)?; + Ok(()) +} + +fn row_from_batch(batch: &RecordBatch, row: usize) -> Result { + let doc_id = batch.column_by_name("doc_id") + .and_then(|c| c.as_any().downcast_ref::()) + .map(|a| a.value(row).to_string()) + .ok_or_else(|| "missing doc_id".to_string())?; + let chunk_text = batch.column_by_name("chunk_text") + .and_then(|c| c.as_any().downcast_ref::()) + .map(|a| a.value(row).to_string()) + .unwrap_or_default(); + let source = batch.column_by_name("source") + .and_then(|c| c.as_any().downcast_ref::()) + .and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) }); + let chunk_idx = batch.column_by_name("chunk_idx") + .and_then(|c| c.as_any().downcast_ref::()) + .and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) }); + + let vec_col = batch.column_by_name("vector") + .ok_or_else(|| "no vector column in row fetch".to_string())?; + let fsl = vec_col.as_any().downcast_ref::() + .ok_or_else(|| "vector column is not FixedSizeList".to_string())?; + let inner = fsl.value(row); + let floats = inner.as_any().downcast_ref::() + .ok_or_else(|| "vector inner not Float32".to_string())?; + let mut v = Vec::with_capacity(floats.len()); + for i in 0..floats.len() { v.push(floats.value(i)); } + + Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx }) +} diff --git a/crates/vectord/Cargo.toml b/crates/vectord/Cargo.toml index b58e98b..b43abd9 100644 --- a/crates/vectord/Cargo.toml +++ b/crates/vectord/Cargo.toml @@ -8,6 +8,9 @@ shared = { path = "../shared" } storaged = { path = "../storaged" } aibridge = { path = "../aibridge" } catalogd = { path = "../catalogd" } +# ADR-019 firewall — vectord-lance owns its own Arrow 57 / Lance 4 deps. +# Public API uses only std types so no version conflict propagates here. +vectord-lance = { path = "../vectord-lance" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/vectord/src/agent.rs b/crates/vectord/src/agent.rs new file mode 100644 index 0000000..64adfe1 --- /dev/null +++ b/crates/vectord/src/agent.rs @@ -0,0 +1,711 @@ +//! Phase 16.2 + 16.5 — The autotune agent. +//! +//! A long-running tokio task that watches the trial journal and +//! autonomously proposes + runs new HNSW configs. Distinct from +//! `autotune::run_autotune` which is synchronous (one HTTP call, grid +//! of trials, done). The agent is the continuous version: it sleeps, +//! wakes on triggers, proposes configs based on prior trial history, +//! runs them one at a time, and auto-promotes when it finds an +//! improvement. +//! +//! Design invariants: +//! - Trials are data (ADR-018). Every proposal reads the journal; every +//! attempt appends to it. The journal is the agent's memory. +//! - One trial at a time. Bounded Ollama load — the agent never fires +//! multiple parallel embeddings and respects `cooldown_between_trials_secs`. +//! - Rate-limited. `max_trials_per_hour` is a hard ceiling so a +//! misbehaving proposal function can't saturate the system. +//! - Never promotes below `min_recall`. Same safety gate as +//! `run_autotune` — we will not make the index worse. +//! - Triggered OR periodic. Ingest enqueues a `DatasetAppended` event +//! when a new batch lands; the agent also wakes periodically to keep +//! exploring even when nothing changed externally. +//! - Graceful shutdown via the `stop_tx` signal — the handle's Drop +//! doesn't force-kill, but `stop()` requests a clean exit after the +//! current trial. + +use chrono::{DateTime, Utc}; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::{Mutex, RwLock, mpsc, oneshot}; + +use aibridge::client::AiClient; +use catalogd::registry::Registry as CatalogRegistry; + +use crate::embedding_cache::EmbeddingCache; +use crate::harness; +use crate::hnsw::HnswStore; +use crate::index_registry::IndexRegistry; +use crate::promotion::{PromotionEntry, PromotionRegistry}; +use crate::trial::{HnswConfig, Trial, TrialJournal, TrialMetrics}; + +// -------- Public-facing types -------- + +/// Runtime configuration for the agent. Mirrored in shared::config under +/// `[agent]`. Defaults are conservative — designed to tune slowly in the +/// background without fighting real workloads for GPU time. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentConfig { + /// Master switch. When false, `spawn` returns a handle but the loop + /// doesn't run. + pub enabled: bool, + /// Periodic wake-up — even if the trigger queue is empty, every N + /// seconds the agent picks an index with trials and proposes one + /// more config. Keeps exploration alive on idle indexes. + pub cycle_interval_secs: u64, + /// Minimum gap between two trials on the SAME index. Prevents the + /// agent from hammering Ollama when a hot index has many pending + /// triggers in a row. + pub cooldown_between_trials_secs: u64, + /// Below this recall, a proposal is never promoted — even if it + /// beats the champion on latency. + pub min_recall: f32, + /// Budget cap: hard ceiling on trials per hour across all indexes. + /// When hit, the agent idles until the hour window rolls. + pub max_trials_per_hour: u32, +} + +impl Default for AgentConfig { + fn default() -> Self { + Self { + enabled: false, // opt-in — don't auto-start until J turns it on + cycle_interval_secs: 60, + cooldown_between_trials_secs: 30, + min_recall: 0.9, + max_trials_per_hour: 30, + } + } +} + +/// What caused the agent to look at a particular index. Recorded in the +/// trial's note field so we can tell "new data arrived" trials from +/// "periodic exploration" trials in the journal. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TriggerReason { + /// Ingest just appended to a dataset that has attached HNSW indexes. + DatasetAppended { dataset: String }, + /// A human or another agent hit `/agent/enqueue/{index}`. + Manual, + /// Periodic wake — no external event, just keep exploring. + Periodic, +} + +/// One unit of work for the agent. +#[derive(Debug, Clone)] +pub struct TriggerEvent { + pub index_name: String, + pub reason: TriggerReason, + pub enqueued_at: DateTime, +} + +impl TriggerEvent { + pub fn manual(index_name: impl Into) -> Self { + Self { index_name: index_name.into(), reason: TriggerReason::Manual, enqueued_at: Utc::now() } + } + pub fn dataset_appended(index_name: impl Into, dataset: impl Into) -> Self { + Self { + index_name: index_name.into(), + reason: TriggerReason::DatasetAppended { dataset: dataset.into() }, + enqueued_at: Utc::now(), + } + } + pub fn periodic(index_name: impl Into) -> Self { + Self { index_name: index_name.into(), reason: TriggerReason::Periodic, enqueued_at: Utc::now() } + } +} + +/// Observable snapshot of the agent's state — what `/agent/status` returns. +#[derive(Debug, Clone, Serialize)] +pub struct AgentStatus { + pub running: bool, + pub config: AgentConfig, + pub queue_depth: usize, + pub trials_run: u64, + pub promotions: u64, + pub trials_in_last_hour: u32, + pub last_event: Option, + pub started_at: Option>, +} + +/// Last thing that happened — useful for "why didn't it do anything?" debugging. +#[derive(Debug, Clone, Serialize)] +pub struct AgentEvent { + pub at: DateTime, + pub kind: String, // "trial_completed" | "promoted" | "skipped_rate_limit" | etc + pub index_name: Option, + pub detail: String, +} + +/// Handle returned by `spawn`. Holds the trigger sender + shared status + +/// stop signal. +#[derive(Clone)] +pub struct AgentHandle { + trigger_tx: mpsc::Sender, + inner: Arc, +} + +struct AgentInner { + status: RwLock, + stop_tx: Mutex>>, + queue_len: Mutex, // mirror of the channel capacity — for status reporting + recent_trials: Mutex>>, // ring of recent trial timestamps for rate limit +} + +impl AgentHandle { + /// Enqueue a trigger. Returns Err if the agent isn't running or the + /// queue is full (backpressure — dropping events is correct here + /// since periodic exploration will pick up the slack). + pub async fn enqueue(&self, event: TriggerEvent) -> Result<(), String> { + self.trigger_tx.try_send(event).map_err(|e| format!("enqueue: {e}"))?; + let mut guard = self.inner.queue_len.lock().await; + *guard = guard.saturating_add(1); + // Update queue_depth in status for observability. + let mut s = self.inner.status.write().await; + s.queue_depth = *guard; + Ok(()) + } + + pub async fn status(&self) -> AgentStatus { + let mut s = self.inner.status.read().await.clone(); + // Refresh rate-limit window from ring buffer. + let cutoff = Utc::now() - chrono::Duration::hours(1); + let ring = self.inner.recent_trials.lock().await; + s.trials_in_last_hour = ring.iter().filter(|t| **t >= cutoff).count() as u32; + s + } + + /// Request a graceful stop. Returns immediately — the loop exits + /// after its current trial completes. + pub async fn stop(&self) -> bool { + let mut guard = self.inner.stop_tx.lock().await; + if let Some(tx) = guard.take() { + let _ = tx.send(()); + true + } else { + false + } + } +} + +// -------- Agent state holder -------- + +/// Everything the agent needs to run a trial. Mirrors the fields of +/// `VectorState` the agent actually uses. Kept separate so the service +/// layer builds it explicitly — no clone of unneeded state. +#[derive(Clone)] +pub struct AgentDeps { + pub store: Arc, + pub ai_client: AiClient, + pub catalog: CatalogRegistry, + pub index_registry: IndexRegistry, + pub hnsw_store: HnswStore, + pub embedding_cache: EmbeddingCache, + pub trial_journal: TrialJournal, + pub promotion_registry: PromotionRegistry, +} + +// -------- Spawn -------- + +/// Start the agent loop in a background tokio task. Returns a handle +/// the caller uses to enqueue events and read status. +pub fn spawn(config: AgentConfig, deps: AgentDeps) -> AgentHandle { + let (trigger_tx, trigger_rx) = mpsc::channel::(256); + let (stop_tx, stop_rx) = oneshot::channel::<()>(); + + let status = AgentStatus { + running: config.enabled, + config: config.clone(), + queue_depth: 0, + trials_run: 0, + promotions: 0, + trials_in_last_hour: 0, + last_event: None, + started_at: if config.enabled { Some(Utc::now()) } else { None }, + }; + + let inner = Arc::new(AgentInner { + status: RwLock::new(status), + stop_tx: Mutex::new(Some(stop_tx)), + queue_len: Mutex::new(0), + recent_trials: Mutex::new(VecDeque::with_capacity(64)), + }); + + if config.enabled { + tracing::info!( + "autotune agent started (cycle={}s, cooldown={}s, cap={}/hr, min_recall={})", + config.cycle_interval_secs, config.cooldown_between_trials_secs, + config.max_trials_per_hour, config.min_recall, + ); + let loop_inner = inner.clone(); + let loop_deps = deps.clone(); + let loop_config = config.clone(); + tokio::spawn(async move { + run_loop(loop_config, loop_deps, trigger_rx, stop_rx, loop_inner).await; + }); + } else { + // Agent disabled — still drain the channel so sends don't back up. + tokio::spawn(async move { + let mut rx = trigger_rx; + while rx.recv().await.is_some() {} + }); + tracing::info!("autotune agent configured but disabled (set [agent].enabled=true)"); + } + + AgentHandle { trigger_tx, inner } +} + +// -------- Main loop -------- + +async fn run_loop( + config: AgentConfig, + deps: AgentDeps, + mut trigger_rx: mpsc::Receiver, + mut stop_rx: oneshot::Receiver<()>, + inner: Arc, +) { + let mut periodic = tokio::time::interval(std::time::Duration::from_secs(config.cycle_interval_secs)); + // First tick fires immediately — skip it so we don't double-fire on startup. + periodic.tick().await; + + loop { + let event = tokio::select! { + _ = &mut stop_rx => { + tracing::info!("autotune agent: stop signal received"); + let mut s = inner.status.write().await; + s.running = false; + return; + } + maybe = trigger_rx.recv() => match maybe { + Some(ev) => { + let mut guard = inner.queue_len.lock().await; + *guard = guard.saturating_sub(1); + let mut s = inner.status.write().await; + s.queue_depth = *guard; + ev + } + None => { + tracing::info!("autotune agent: trigger channel closed"); + return; + } + }, + _ = periodic.tick() => { + // Periodic wake — pick an index with existing trials. + // If nothing's been tuned yet there's nothing to propose. + match pick_periodic_target(&deps).await { + Some(idx) => TriggerEvent::periodic(idx), + None => continue, + } + } + }; + + // Rate limit: count recent trials, skip if over budget. + if over_rate_limit(&inner, config.max_trials_per_hour).await { + record_event(&inner, "skipped_rate_limit", Some(&event.index_name), + format!("hit cap of {}/hour", config.max_trials_per_hour)).await; + continue; + } + + // Per-index cooldown. + if cooling_down(&inner, &event.index_name, config.cooldown_between_trials_secs).await { + record_event(&inner, "skipped_cooldown", Some(&event.index_name), + format!("last trial too recent (<{}s)", config.cooldown_between_trials_secs)).await; + continue; + } + + // Run one trial. + match run_one_cycle(&event, &deps, config.min_recall).await { + Ok(outcome) => { + mark_recent_trial(&inner).await; + { + let mut s = inner.status.write().await; + s.trials_run += 1; + if outcome.promoted { s.promotions += 1; } + } + record_event(&inner, if outcome.promoted { "promoted" } else { "trial_completed" }, + Some(&event.index_name), + format!("config=ec{}/es{} recall={:.3} p50={:.0}us {}", + outcome.trial.config.ef_construction, + outcome.trial.config.ef_search, + outcome.trial.metrics.recall_at_k, + outcome.trial.metrics.search_latency_p50_us, + if outcome.promoted { "★ PROMOTED" } else { "" })).await; + } + Err(e) => { + record_event(&inner, "trial_error", Some(&event.index_name), e).await; + } + } + } +} + +/// Result of one cycle — ran a trial, maybe promoted it. +struct CycleOutcome { + trial: Trial, + promoted: bool, +} + +/// Core cycle: propose → build → bench → record → maybe promote. +async fn run_one_cycle( + event: &TriggerEvent, + deps: &AgentDeps, + min_recall: f32, +) -> Result { + // Read history. + let history = deps.trial_journal.list(&event.index_name).await + .map_err(|e| format!("read journal: {e}"))?; + if history.is_empty() { + return Err(format!( + "no trials yet for '{}' — seed with at least one POST /hnsw/trial first", + event.index_name, + )); + } + + // Current champion (if any) is the promoted config. + let champion = deps.promotion_registry.get_current(&event.index_name).await; + let champion_trial = champion.as_ref().and_then(|p| { + history.iter().find(|t| t.id == p.trial_id).cloned() + }); + + // Propose the next config. + let Some(next_config) = propose_next_config(&history, champion_trial.as_ref()) else { + return Err("proposer returned None — search space exhausted".into()); + }; + + // Validate bounds defensively. + if !(10..=400).contains(&next_config.ef_construction) { + return Err(format!("proposed ef_construction={} out of bounds", next_config.ef_construction)); + } + if !(10..=200).contains(&next_config.ef_search) { + return Err(format!("proposed ef_search={} out of bounds", next_config.ef_search)); + } + + // Need a harness to measure. Use the most recent one in history. + // (A future refinement: remember per-index "canonical harness" on + // the index metadata. For now: latest wins.) + let harness_name = history.last().unwrap().eval_set.clone(); + let mut harness_set = harness::EvalSet::load(&deps.store, &harness_name).await + .map_err(|e| format!("load harness '{harness_name}': {e}"))?; + + let embeddings = deps.embedding_cache.get_or_load(&event.index_name).await + .map_err(|e| format!("embeddings: {e}"))?; + + if !harness_set.ground_truth_built { + harness::compute_ground_truth(&mut harness_set, &embeddings, &deps.ai_client).await + .map_err(|e| format!("ground truth: {e}"))?; + harness_set.save(&deps.store).await.ok(); + } + + // Build + bench. + let trial_id = Trial::new_id(); + let slot = format!("{}__{}", event.index_name, trial_id); + let build = deps.hnsw_store + .build_index_with_config(&slot, (*embeddings).clone(), &next_config) + .await?; + + let query_vectors: Vec> = harness_set.queries + .iter().filter_map(|q| q.query_embedding.clone()).collect(); + let bench = deps.hnsw_store.bench_search(&slot, &query_vectors, harness_set.k).await?; + + let mut recalls = Vec::with_capacity(harness_set.queries.len()); + for (q, hits) in harness_set.queries.iter().zip(bench.retrieved.iter()) { + if let Some(gt) = &q.ground_truth { + recalls.push(harness::recall_at_k(hits, gt, harness_set.k)); + } + } + let mean_recall = if recalls.is_empty() { 0.0 } else { + recalls.iter().sum::() / recalls.len() as f32 + }; + + let mut lats = bench.latencies_us.clone(); + lats.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let p = |pct: f32| -> f32 { + if lats.is_empty() { return 0.0; } + let idx = ((lats.len() as f32 - 1.0) * pct).round() as usize; + lats[idx.min(lats.len() - 1)] + }; + + let dims = embeddings.first().map(|e| e.vector.len()).unwrap_or(0); + let memory_bytes = (embeddings.len() * dims * std::mem::size_of::() + embeddings.len() * 128) as u64; + + let note = match &event.reason { + TriggerReason::DatasetAppended { dataset } => format!("agent: dataset_appended({dataset})"), + TriggerReason::Manual => "agent: manual".to_string(), + TriggerReason::Periodic => "agent: periodic".to_string(), + }; + + let trial = Trial { + id: trial_id, + index_name: event.index_name.clone(), + eval_set: harness_set.name.clone(), + config: next_config.clone(), + metrics: TrialMetrics { + build_time_secs: build.build_time_secs, + search_latency_p50_us: p(0.50), + search_latency_p95_us: p(0.95), + search_latency_p99_us: p(0.99), + recall_at_k: mean_recall, + memory_bytes, + vectors: build.vectors, + eval_queries: harness_set.queries.len(), + brute_force_latency_us: 0.0, + }, + created_at: Utc::now(), + note: Some(note), + }; + deps.trial_journal.append(&trial).await.ok(); + deps.hnsw_store.drop(&slot).await; + + // Promotion decision: the new trial must meet recall gate AND beat + // the current champion (higher recall OR same recall + lower p50). + let promoted = if trial.metrics.recall_at_k < min_recall { + false + } else { + let beats = match &champion_trial { + None => true, // no champion yet — anything passing the gate wins + Some(c) => beats_champion(&trial, c), + }; + if beats { + let entry = PromotionEntry { + config: trial.config.clone(), + trial_id: trial.id.clone(), + promoted_at: Utc::now(), + promoted_by: "agent".to_string(), + note: Some(format!( + "auto-promote: recall={:.3} p50={:.0}us (was {:.3}/{:.0}us)", + trial.metrics.recall_at_k, trial.metrics.search_latency_p50_us, + champion_trial.as_ref().map(|t| t.metrics.recall_at_k).unwrap_or(0.0), + champion_trial.as_ref().map(|t| t.metrics.search_latency_p50_us).unwrap_or(0.0), + )), + }; + deps.promotion_registry.promote(&event.index_name, entry).await.is_ok() + } else { + false + } + }; + + Ok(CycleOutcome { trial, promoted }) +} + +/// Champion-beat test: strictly higher recall, OR equal recall with +/// lower p50. Same rule as autotune::pick_winner — kept consistent so the +/// agent and the synchronous autotune agree on what "better" means. +fn beats_champion(candidate: &Trial, champion: &Trial) -> bool { + if candidate.metrics.recall_at_k > champion.metrics.recall_at_k { + return true; + } + if (candidate.metrics.recall_at_k - champion.metrics.recall_at_k).abs() < 1e-4 + && candidate.metrics.search_latency_p50_us < champion.metrics.search_latency_p50_us { + return true; + } + false +} + +/// Propose the next HnswConfig given trial history and the current +/// champion. +/// +/// ============================================================ +/// J: THIS IS YOURS TO IMPLEMENT +/// ============================================================ +/// +/// Inputs: +/// - `history`: every trial ever run on this index, oldest first +/// - `champion`: the currently-promoted trial, if any +/// +/// Output: +/// - `Some(HnswConfig)` with the config to try next +/// - `None` if you decide there's nothing worth trying (agent logs +/// "search space exhausted" and moves on) +/// +/// Hard bounds the caller enforces: ef_construction ∈ [10, 400], +/// ef_search ∈ [10, 200]. Stay inside those — configs outside get +/// rejected and count as a wasted cycle. +/// +/// Design options (pick one, or mix): +/// +/// 1. LOCAL REFINEMENT (exploit-heavy): +/// Sample near champion ± small delta. Converges fast, risks local +/// minima. Good for "we know roughly where the sweet spot is." +/// +/// 2. ε-GREEDY (mixed): +/// With prob ε, random sample from full bounds (explore). Otherwise +/// refinement around champion (exploit). ε=0.2 is a reasonable start. +/// Good for long-running tune with no prior knowledge. +/// +/// 3. COARSE→FINE (annealed): +/// First N trials: wide random. Then shrink the neighborhood around +/// champion as more trials accumulate. Mimics simulated annealing. +/// +/// 4. DEDUP-AWARE: +/// Whatever strategy, skip configs already in history. Prevents the +/// agent from re-running the same (ec, es) pair twice. +/// +/// A simple starter implementation is provided below (local refinement +/// + dedup). Replace with your preferred strategy. +pub fn propose_next_config(history: &[Trial], champion: Option<&Trial>) -> Option { + // ε-greedy around the champion, dedup-aware. + // + // - With probability ε (≈0.25), sample a random config from the full + // bounds. Keeps exploration alive so we don't get stuck hill-climbing + // one axis. + // - Otherwise: perturb the champion symmetrically on BOTH axes (not + // just +20 / +40 like the starter did). Prefers small moves first + // so recall stays near the current level. + // - Always skip configs already in history — no point re-running. + // - Deterministic per-history: RNG is seeded from history length so + // the same journal state always proposes the same next config. + // Makes tests + offline replay reproducible. + + let base = champion + .map(|t| t.config.clone()) + .or_else(|| history.last().map(|t| t.config.clone())) + .unwrap_or_default(); + + let tried = |ec: usize, es: usize| -> bool { + history.iter().any(|t| + t.config.ef_construction == ec && t.config.ef_search == es + ) + }; + let clamp = |ec: i32, es: i32| -> (usize, usize) { + (ec.clamp(10, 400) as usize, es.clamp(10, 200) as usize) + }; + + // Tiny xorshift — no rand crate dep. Seeded from history length so the + // proposer is deterministic for a given journal state. + let mut rng = (history.len() as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 1; + let mut next_u = || { + rng ^= rng << 13; rng ^= rng >> 7; rng ^= rng << 17; + rng + }; + + for _attempt in 0..32 { + let explore = (next_u() % 100) < 25; // ε = 0.25 + let (ec, es) = if explore { + clamp(10 + (next_u() % 391) as i32, 10 + (next_u() % 191) as i32) + } else { + // Symmetric perturbation — signed steps on both axes. + let dec = [-40, -20, -10, 10, 20, 40][(next_u() % 6) as usize]; + let des = [-20, -10, -5, 5, 10, 20][(next_u() % 6) as usize]; + clamp(base.ef_construction as i32 + dec, base.ef_search as i32 + des) + }; + if !tried(ec, es) { + return Some(HnswConfig { ef_construction: ec, ef_search: es, seed: Some(42) }); + } + } + None // 32 attempts all landed on duplicates — likely saturated +} + +// -------- Helpers -------- + +/// Find an index to poke on a periodic wake. Strategy: the one with the +/// most recent trial. If nothing's been trialed yet, return None. +async fn pick_periodic_target(deps: &AgentDeps) -> Option { + // `/agent` runs against any index that has a trial journal. We don't + // have a "list all journals" helper, so we derive candidates from the + // promotion registry (indexes with a human ever promoting are live). + let promos = deps.promotion_registry.list_all().await.ok()?; + // Prefer the one most recently promoted — it's the one a human cares + // about right now. + promos.into_iter() + .filter_map(|f| f.current.map(|c| (f.index_name, c.promoted_at))) + .max_by_key(|(_, at)| *at) + .map(|(name, _)| name) +} + +async fn over_rate_limit(inner: &Arc, cap: u32) -> bool { + let cutoff = Utc::now() - chrono::Duration::hours(1); + let ring = inner.recent_trials.lock().await; + ring.iter().filter(|t| **t >= cutoff).count() as u32 >= cap +} + +async fn cooling_down(inner: &Arc, _index: &str, cooldown_secs: u64) -> bool { + // Minimal impl: gate on global most-recent trial rather than per-index. + // Per-index cooldown would be easy to add — keep a HashMap + // — but for Phase 16.2 MVP, global is fine. Ollama is the shared resource. + let ring = inner.recent_trials.lock().await; + if let Some(last) = ring.back() { + let since = Utc::now().signed_duration_since(*last); + return since < chrono::Duration::seconds(cooldown_secs as i64); + } + false +} + +async fn mark_recent_trial(inner: &Arc) { + let mut ring = inner.recent_trials.lock().await; + ring.push_back(Utc::now()); + // Keep bounded. + while ring.len() > 256 { + ring.pop_front(); + } +} + +async fn record_event( + inner: &Arc, + kind: &str, + index: Option<&str>, + detail: String, +) { + tracing::info!("agent: {} {}{}", kind, + index.map(|i| format!("[{i}] ")).unwrap_or_default(), detail); + let mut s = inner.status.write().await; + s.last_event = Some(AgentEvent { + at: Utc::now(), + kind: kind.to_string(), + index_name: index.map(String::from), + detail, + }); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn mk_trial(ec: usize, es: usize, recall: f32, p50: f32) -> Trial { + Trial { + id: format!("t-{ec}-{es}"), + index_name: "test".into(), + eval_set: "eval".into(), + config: HnswConfig { ef_construction: ec, ef_search: es, seed: Some(42) }, + metrics: TrialMetrics { + build_time_secs: 1.0, + search_latency_p50_us: p50, + search_latency_p95_us: p50 * 1.5, + search_latency_p99_us: p50 * 2.0, + recall_at_k: recall, + memory_bytes: 0, vectors: 1000, eval_queries: 10, + brute_force_latency_us: 100.0, + }, + created_at: Utc::now(), + note: None, + } + } + + #[test] + fn propose_skips_duplicates() { + let hist = vec![ + mk_trial(80, 30, 1.0, 500.0), + mk_trial(100, 30, 1.0, 520.0), // ec+20 + ]; + let next = propose_next_config(&hist, Some(&hist[0])).unwrap(); + // ec+20 is taken, so the proposer should skip it. + assert!(next.ef_construction != 100 || next.ef_search != 30); + } + + #[test] + fn beats_champion_strict_recall() { + let champ = mk_trial(80, 30, 0.95, 500.0); + let better_recall = mk_trial(80, 30, 0.99, 600.0); + let worse_recall = mk_trial(80, 30, 0.90, 100.0); + assert!(beats_champion(&better_recall, &champ)); + assert!(!beats_champion(&worse_recall, &champ)); + } + + #[test] + fn beats_champion_same_recall_lower_latency() { + let champ = mk_trial(80, 30, 1.0, 500.0); + let faster = mk_trial(60, 30, 1.0, 400.0); + let slower = mk_trial(60, 30, 1.0, 600.0); + assert!(beats_champion(&faster, &champ)); + assert!(!beats_champion(&slower, &champ)); + } +} diff --git a/crates/vectord/src/index_registry.rs b/crates/vectord/src/index_registry.rs index 7bc3367..621e129 100644 --- a/crates/vectord/src/index_registry.rs +++ b/crates/vectord/src/index_registry.rs @@ -29,8 +29,21 @@ pub struct IndexMeta { pub created_at: DateTime, pub build_time_secs: f32, pub chunks_per_sec: f32, + /// Federation layer 2: which bucket holds this index's artifacts + /// (trial journal + promotion file). Defaults to "primary" for + /// pre-federation indexes — the serde default keeps old metadata + /// files readable without migration. + #[serde(default = "default_bucket")] + pub bucket: String, + /// ADR-019: which physical backend stores this index. `Parquet` + /// means storage_key points at our binary-blob Parquet file; + /// `Lance` means it points at a Lance dataset directory. + #[serde(default)] + pub vector_backend: shared::types::VectorBackend, } +fn default_bucket() -> String { "primary".to_string() } + /// Registry of all vector indexes. #[derive(Clone)] pub struct IndexRegistry { diff --git a/crates/vectord/src/lance_backend.rs b/crates/vectord/src/lance_backend.rs new file mode 100644 index 0000000..9f019c4 --- /dev/null +++ b/crates/vectord/src/lance_backend.rs @@ -0,0 +1,112 @@ +//! ADR-019 hybrid: routing layer for the Lance vector backend. +//! +//! Holds a `LanceRegistry` which maps an index name to its +//! `LanceVectorStore` instance, lazy-creating on first touch. Path +//! resolution: the index's bucket gives us a bucket root; we append +//! `lance/{index_name}` and use that as the dataset URI. For local +//! buckets that means a directory under the bucket's root. +//! +//! S3 buckets: in principle Lance accepts s3://... URIs, but the +//! firewall crate (`vectord-lance`) doesn't pull the S3 feature in by +//! default. When we promote profile buckets onto S3 in a future phase, +//! enable that feature and update `lance_uri_for` accordingly. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; + +use storaged::registry::BucketRegistry; +use vectord_lance::LanceVectorStore; + +use crate::index_registry::IndexRegistry; + +/// Convert a bucket+index pair into the URI Lance should use as the +/// dataset path. Local-only for MVP; S3 when we wire that backend. +/// +/// Path resolution mirrors lakehouse.toml's convention for local +/// buckets: ./data for primary, ./data/_rescue for rescue, ./data/_testing +/// for testing, ./data/_profiles/{sanitized} for profile:* buckets, and +/// ./data/_buckets/{sanitized} for everything else. Sanitization replaces +/// `:` with `_` so paths are filesystem-safe. +/// +/// Refuses unknown buckets so a typo doesn't silently land Lance data +/// in a directory the rest of the system can't see. +pub fn lance_uri_for( + buckets: &BucketRegistry, + bucket: &str, + index_name: &str, +) -> Result { + if !buckets.contains(bucket) { + return Err(format!("bucket '{bucket}' not registered")); + } + let root: PathBuf = match bucket { + "primary" => PathBuf::from("./data"), + "rescue" => PathBuf::from("./data/_rescue"), + "testing" => PathBuf::from("./data/_testing"), + b if b.starts_with("profile:") => { + let safe = b.replace(':', "_"); + PathBuf::from(format!("./data/_profiles/{safe}")) + } + b => PathBuf::from(format!("./data/_buckets/{}", b.replace(':', "_"))), + }; + let dataset_dir = root.join("lance").join(index_name); + // Pre-create the parent so Lance's first write doesn't trip on a + // missing ancestor. Lance handles the dataset directory itself. + let _ = std::fs::create_dir_all(root.join("lance")); + // Canonicalize after the parent is guaranteed to exist; if the + // dataset dir hasn't been created yet, canonicalize the parent and + // tack on the leaf name. + let abs = match std::fs::canonicalize(&root) { + Ok(p) => p.join("lance").join(index_name), + Err(_) => dataset_dir.clone(), + }; + Ok(abs.to_string_lossy().to_string()) +} + +/// Lookup-by-index registry of `LanceVectorStore` handles. One handle +/// per index, lazy-created on first call. Cheap to keep alive — it's +/// just a path string + handle to Lance's metadata cache. +#[derive(Clone)] +pub struct LanceRegistry { + buckets: Arc, + indexes: IndexRegistry, + stores: Arc>>, +} + +impl LanceRegistry { + pub fn new(buckets: Arc, indexes: IndexRegistry) -> Self { + Self { + buckets, + indexes, + stores: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Get (or lazy-create) the LanceVectorStore for an index. + /// Resolves bucket → URI via the registry; backend stays whatever + /// IndexMeta says (no enforcement here — caller decides whether to + /// route to Lance based on profile/index backend choice). + pub async fn store_for(&self, index_name: &str) -> Result { + if let Some(s) = self.stores.read().await.get(index_name) { + return Ok(s.clone()); + } + let bucket = self.indexes + .get(index_name).await + .map(|m| m.bucket) + .unwrap_or_else(|| "primary".to_string()); + let uri = lance_uri_for(&self.buckets, &bucket, index_name)?; + let store = LanceVectorStore::new(uri); + self.stores.write().await.insert(index_name.to_string(), store.clone()); + Ok(store) + } + + /// For freshly-created indexes that don't have IndexMeta yet — caller + /// supplies the bucket explicitly. + pub async fn store_for_new(&self, index_name: &str, bucket: &str) -> Result { + let uri = lance_uri_for(&self.buckets, bucket, index_name)?; + let store = LanceVectorStore::new(uri); + self.stores.write().await.insert(index_name.to_string(), store.clone()); + Ok(store) + } +} diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index bacf095..e72c41e 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -1,4 +1,6 @@ +pub mod agent; pub mod autotune; +pub mod lance_backend; pub mod chunker; pub mod embedding_cache; pub mod harness; diff --git a/crates/vectord/src/promotion.rs b/crates/vectord/src/promotion.rs index c7dd4bc..579b7fe 100644 --- a/crates/vectord/src/promotion.rs +++ b/crates/vectord/src/promotion.rs @@ -19,13 +19,14 @@ //! - Agent loop — lives in `vectord::autotune`. use chrono::{DateTime, Utc}; -use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use storaged::ops; +use storaged::registry::BucketRegistry; use tokio::sync::RwLock; +use crate::index_registry::IndexRegistry; use crate::trial::HnswConfig; const PROMOTION_PREFIX: &str = "_hnsw_promotions"; @@ -55,14 +56,16 @@ pub struct PromotionFile { #[derive(Clone)] pub struct PromotionRegistry { - store: Arc, + buckets: Arc, + index_registry: IndexRegistry, cache: Arc>>, } impl PromotionRegistry { - pub fn new(store: Arc) -> Self { + pub fn new(buckets: Arc, index_registry: IndexRegistry) -> Self { Self { - store, + buckets, + index_registry, cache: Arc::new(RwLock::new(HashMap::new())), } } @@ -76,13 +79,26 @@ impl PromotionRegistry { format!("{PROMOTION_PREFIX}/{safe}.json") } + /// Resolve which bucket's store holds this index's promotion file. + /// Same rules as TrialJournal::bucket_for — follows IndexMeta.bucket, + /// defaults to primary when metadata is missing. + async fn store_for(&self, index_name: &str) -> Result, String> { + let bucket = self.index_registry + .get(index_name) + .await + .map(|m| m.bucket) + .unwrap_or_else(|| "primary".to_string()); + self.buckets.get(&bucket) + } + /// Load (and cache) the promotion file for an index. pub async fn load(&self, index_name: &str) -> Result { if let Some(cached) = self.cache.read().await.get(index_name) { return Ok(cached.clone()); } + let store = self.store_for(index_name).await?; let key = Self::key(index_name); - let file = match ops::get(&self.store, &key).await { + let file = match ops::get(&store, &key).await { Ok(bytes) => serde_json::from_slice::(&bytes) .map_err(|e| format!("parse promotion file: {e}"))?, Err(_) => PromotionFile { @@ -118,9 +134,10 @@ impl PromotionRegistry { file.current = Some(entry); file.index_name = index_name.to_string(); + let store = self.store_for(index_name).await?; let key = Self::key(index_name); let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?; - ops::put(&self.store, &key, json.into()).await?; + ops::put(&store, &key, json.into()).await?; self.cache.write().await.insert(index_name.to_string(), file.clone()); tracing::info!( @@ -146,9 +163,10 @@ impl PromotionRegistry { file.current = None; } } + let store = self.store_for(index_name).await?; let key = Self::key(index_name); let json = serde_json::to_vec_pretty(&file).map_err(|e| e.to_string())?; - ops::put(&self.store, &key, json.into()).await?; + ops::put(&store, &key, json.into()).await?; self.cache.write().await.insert(index_name.to_string(), file.clone()); tracing::info!("rolled back promotion for '{}'", index_name); Ok(file) @@ -169,19 +187,34 @@ impl PromotionRegistry { } /// List every index that has a promotion recorded (for operator UI). + /// + /// Federation: scans EVERY registered bucket for promotion files. + /// Per-profile buckets each have their own `_hnsw_promotions/` so we + /// aggregate across them. Dedups by index_name — if the same index + /// somehow has promotion files in multiple buckets, the one from the + /// bucket recorded in IndexMeta wins. pub async fn list_all(&self) -> Result, String> { - let keys = ops::list(&self.store, Some(&format!("{PROMOTION_PREFIX}/"))).await.unwrap_or_default(); - let mut out = Vec::new(); - for key in keys { - if !key.ends_with(".json") { continue; } - let bytes = match ops::get(&self.store, &key).await { - Ok(b) => b, + let bucket_infos = self.buckets.list().await; + let mut by_name: HashMap = HashMap::new(); + + for b in &bucket_infos { + let store = match self.buckets.get(&b.name) { + Ok(s) => s, Err(_) => continue, }; - if let Ok(f) = serde_json::from_slice::(&bytes) { - out.push(f); + let keys = ops::list(&store, Some(&format!("{PROMOTION_PREFIX}/"))) + .await.unwrap_or_default(); + for key in keys { + if !key.ends_with(".json") { continue; } + let bytes = match ops::get(&store, &key).await { + Ok(b) => b, + Err(_) => continue, + }; + if let Ok(f) = serde_json::from_slice::(&bytes) { + by_name.insert(f.index_name.clone(), f); + } } } - Ok(out) + Ok(by_name.into_values().collect()) } } diff --git a/crates/vectord/src/refresh.rs b/crates/vectord/src/refresh.rs index fab6e9c..1e9c62e 100644 --- a/crates/vectord/src/refresh.rs +++ b/crates/vectord/src/refresh.rs @@ -305,6 +305,8 @@ async fn try_update_index_meta( created_at: chrono::Utc::now(), build_time_secs: 0.0, chunks_per_sec: 0.0, + bucket: "primary".to_string(), + vector_backend: shared::types::VectorBackend::Parquet, }; index_registry.register(meta).await } diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 72c18cc..1c47acb 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -11,7 +11,8 @@ use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; use catalogd::registry::Registry as CatalogRegistry; -use crate::{autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, promotion, rag, refresh, search, store, supervisor, trial}; +use storaged::registry::BucketRegistry; +use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, promotion, rag, refresh, search, store, supervisor, trial}; #[derive(Clone)] pub struct VectorState { @@ -27,6 +28,33 @@ pub struct VectorState { pub catalog: CatalogRegistry, /// Phase 16: promoted HNSW configs. Activation + autotune read/write here. pub promotion_registry: promotion::PromotionRegistry, + /// Phase 16.2: handle to the background autotune agent. Always + /// present — if the agent is disabled in config, the handle drops + /// incoming triggers silently. + pub agent_handle: agent::AgentHandle, + /// Phase B (federation layer 2): bucket registry for per-profile + /// bucket auto-provisioning on activation. + pub bucket_registry: Arc, + /// Phase C (two-profile VRAM gate): tracks which profile is currently + /// "active" on the GPU. Singleton — one profile at a time holds its + /// model in VRAM. Swapping profiles with different ollama_name unloads + /// the previous one (keep_alive=0) before preloading the new one. + /// + /// `None` = no profile has been activated this session; any first + /// activation just preloads and takes the slot. + pub active_profile: Arc>>, + /// ADR-019 hybrid: handles to Lance datasets keyed by index name. + /// Lazy-created on first /vectors/lance/* call. + pub lance: lance_backend::LanceRegistry, +} + +/// What the active-profile singleton records. Narrow — we don't need the +/// full ModelProfile here, just enough to know what to unload on swap. +#[derive(Debug, Clone, Serialize)] +pub struct ActiveProfileSlot { + pub profile_id: String, + pub ollama_name: String, + pub activated_at: chrono::DateTime, } pub fn router(state: VectorState) -> Router { @@ -59,12 +87,26 @@ pub fn router(state: VectorState) -> Router { // Phase 17: profile activation — pre-load caches + HNSW for this // model's bound data. First search after activate is warm. .route("/profile/{id}/activate", post(activate_profile)) + .route("/profile/{id}/deactivate", post(deactivate_profile)) .route("/profile/{id}/search", post(profile_scoped_search)) + // Phase 17 VRAM gate: which profile currently owns the GPU? + .route("/profile/active", get(get_active_profile)) // Phase 16: promotion + autotune .route("/hnsw/promote/{index}/{trial_id}", post(promote_trial)) .route("/hnsw/rollback/{index}", post(rollback_promotion)) .route("/hnsw/promoted/{index}", get(get_promoted)) .route("/hnsw/autotune", post(run_autotune_endpoint)) + // Phase 16.2: background autotune agent + .route("/agent/status", get(agent_status)) + .route("/agent/stop", post(agent_stop)) + .route("/agent/enqueue/{index_name}", post(agent_enqueue)) + // ADR-019: Lance hybrid backend + .route("/lance/migrate/{index_name}", post(lance_migrate)) + .route("/lance/index/{index_name}", post(lance_build_index)) + .route("/lance/search/{index_name}", post(lance_search)) + .route("/lance/doc/{index_name}/{doc_id}", get(lance_get_doc)) + .route("/lance/append/{index_name}", post(lance_append)) + .route("/lance/stats/{index_name}", get(lance_stats)) .with_state(state) } @@ -81,6 +123,11 @@ struct CreateIndexRequest { documents: Vec, chunk_size: Option, overlap: Option, + /// Federation layer 2: optional bucket to hold this index's trial + /// journal + promotion file. Defaults to "primary" — pre-existing + /// clients that don't know about federation keep working unchanged. + #[serde(default)] + bucket: Option, } #[derive(Deserialize)] @@ -117,6 +164,7 @@ async fn create_index( let n_docs = req.documents.len(); let n_chunks = chunks.len(); let index_name = req.index_name.clone(); + let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string()); // Create job and return immediately let job_id = state.job_tracker.create(&index_name, n_chunks).await; @@ -157,6 +205,8 @@ async fn create_index( created_at: chrono::Utc::now(), build_time_secs: elapsed, chunks_per_sec: rate, + bucket: bucket.clone(), + vector_backend: shared::types::VectorBackend::Parquet, }; let _ = registry.register(meta).await; @@ -753,6 +803,11 @@ struct ActivateReport { failures: Vec, total_vectors: usize, duration_secs: f32, + /// Phase C: did we successfully preload the Ollama model? + model_preloaded: bool, + /// Phase C: which profile previously held the GPU slot, if any. + /// Useful for observability of the swap. + previous_profile: Option, } #[derive(Serialize)] @@ -788,6 +843,64 @@ async fn activate_profile( let mut failures = Vec::new(); let mut total_vectors = 0usize; + // Phase 17 / C: VRAM-aware swap. If another profile currently holds + // the GPU and uses a DIFFERENT Ollama model than the one being + // activated, unload it first (keep_alive=0). Same-model activations + // skip the unload — no point churning a model that's already loaded. + let previous_slot = { + let guard = state.active_profile.read().await; + guard.clone() + }; + if let Some(prev) = &previous_slot { + if prev.ollama_name != profile.ollama_name { + match state.ai_client.unload_model(&prev.ollama_name).await { + Ok(_) => tracing::info!( + "profile swap: unloaded '{}' ({} -> {})", + prev.ollama_name, prev.profile_id, profile.id, + ), + Err(e) => failures.push(format!( + "unload previous model '{}': {e}", prev.ollama_name, + )), + } + } + } + + // Federation layer 2: if this profile declares its own bucket and + // that bucket isn't registered yet, auto-provision it under the + // configured profile_root. This is the moment a "dormant" profile + // becomes live — its bucket exists and is readable/writable. + if let Some(bucket_name) = profile.bucket.clone() { + if !state.bucket_registry.contains(&bucket_name) { + let root = format!( + "{}/{}", + state.bucket_registry.profile_root().trim_end_matches('/'), + bucket_name.replace(':', "_"), + ); + let bc = shared::config::BucketConfig { + name: bucket_name.clone(), + backend: "local".to_string(), + root: Some(root.clone()), + bucket: None, + region: None, + endpoint: None, + secret_ref: None, + }; + match state.bucket_registry.add_bucket(bc).await { + Ok(info) => { + tracing::info!( + "profile '{}' activated bucket '{}' (root={}, reachable={})", + profile.id, bucket_name, root, info.reachable, + ); + } + Err(e) => { + failures.push(format!( + "auto-provision bucket '{}': {}", bucket_name, e, + )); + } + } + } + } + let all_indexes = state.index_registry.list(None, None).await; for binding in &profile.bound_datasets { @@ -847,6 +960,30 @@ async fn activate_profile( } } + // Preload the new profile's Ollama model proactively. Same-model + // re-activations are cheap (Ollama no-ops if already loaded). + let mut model_preloaded = false; + match state.ai_client.preload_model(&profile.ollama_name).await { + Ok(_) => { + model_preloaded = true; + tracing::info!("profile '{}' preloaded ollama model '{}'", + profile.id, profile.ollama_name); + } + Err(e) => failures.push(format!( + "preload ollama model '{}': {e}", profile.ollama_name, + )), + } + + // Take the GPU slot. + { + let mut guard = state.active_profile.write().await; + *guard = Some(ActiveProfileSlot { + profile_id: profile.id.clone(), + ollama_name: profile.ollama_name.clone(), + activated_at: chrono::Utc::now(), + }); + } + Ok(Json(ActivateReport { profile_id: profile.id, ollama_name: profile.ollama_name, @@ -854,9 +991,52 @@ async fn activate_profile( failures, total_vectors, duration_secs: t0.elapsed().as_secs_f32(), + model_preloaded, + previous_profile: previous_slot.map(|s| s.profile_id), })) } +/// Unload this profile's model and clear the active slot. No-op if the +/// caller isn't the currently-active profile. +async fn deactivate_profile( + State(state): State, + Path(profile_id): Path, +) -> impl IntoResponse { + let profile = match state.catalog.get_profile(&profile_id).await { + Some(p) => p, + None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), + }; + + let was_active = { + let mut guard = state.active_profile.write().await; + match guard.as_ref() { + Some(s) if s.profile_id == profile_id => { + let prev = s.clone(); + *guard = None; + Some(prev) + } + _ => None, + } + }; + + // Regardless of whether it held the slot, we can still try to unload — + // the operator's intent is "get this model out of VRAM." + let unload_result = state.ai_client.unload_model(&profile.ollama_name).await; + + Ok(Json(serde_json::json!({ + "profile_id": profile.id, + "ollama_name": profile.ollama_name, + "was_active": was_active.is_some(), + "unloaded": unload_result.is_ok(), + "unload_error": unload_result.err(), + }))) +} + +async fn get_active_profile(State(state): State) -> impl IntoResponse { + let slot = state.active_profile.read().await.clone(); + Json(slot) +} + #[derive(Deserialize)] struct ProfileSearchRequest { index_name: String, @@ -1013,3 +1193,232 @@ async fn run_autotune_endpoint( Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } + +// --- Phase 16.2: autotune agent endpoints --- + +async fn agent_status(State(state): State) -> impl IntoResponse { + Json(state.agent_handle.status().await) +} + +async fn agent_stop(State(state): State) -> impl IntoResponse { + let stopped = state.agent_handle.stop().await; + Json(serde_json::json!({ "stopped": stopped })) +} + +async fn agent_enqueue( + State(state): State, + Path(index_name): Path, +) -> impl IntoResponse { + let event = agent::TriggerEvent::manual(index_name); + match state.agent_handle.enqueue(event).await { + Ok(()) => Ok(Json(serde_json::json!({ "enqueued": true }))), + Err(e) => Err((StatusCode::SERVICE_UNAVAILABLE, e)), + } +} + +// --- ADR-019: Lance hybrid backend HTTP surface --- +// +// Lance routes operate on the same `index_name` as the Parquet/HNSW path, +// but materialize the data as a Lance dataset on disk under +// `{bucket_root}/lance/{index_name}/`. The two backends are independent: +// you can have an index in both formats simultaneously. `IndexMeta.vector_backend` +// records which one is canonical for that index. + +#[derive(Deserialize)] +struct LanceMigrateRequest { + /// Optional bucket override. Defaults to whatever the existing + /// IndexMeta says, or "primary" for indexes that don't exist yet. + #[serde(default)] + bucket: Option, +} + +/// Read the existing Parquet vector file for `index_name` from object +/// storage, hand the bytes to vectord-lance, return migration stats. +/// The original Parquet file is left intact — both backends coexist +/// after migration. +async fn lance_migrate( + State(state): State, + Path(index_name): Path, + Json(req): Json, +) -> impl IntoResponse { + let meta = state.index_registry.get(&index_name).await + .ok_or((StatusCode::NOT_FOUND, format!("index not found: {index_name}")))?; + let bucket = req.bucket.unwrap_or(meta.bucket.clone()); + + // Pull the Parquet bytes via storaged::ops — same path as the + // existing embedding loader uses. + let store = state.bucket_registry.get(&bucket) + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + let bytes = storaged::ops::get(&store, &meta.storage_key).await + .map_err(|e| (StatusCode::NOT_FOUND, format!("read parquet: {e}")))?; + + let lance_store = state.lance.store_for_new(&index_name, &bucket).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + + let stats = lance_store.migrate_from_parquet_bytes(&bytes).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + tracing::info!( + "lance migrate '{}': {} rows, {}d, {} bytes on disk, {:.2}s", + index_name, stats.rows_written, stats.dimensions, + stats.disk_bytes, stats.duration_secs, + ); + + Ok::<_, (StatusCode, String)>(Json(serde_json::json!({ + "index_name": index_name, + "bucket": bucket, + "lance_path": lance_store.path(), + "stats": stats, + }))) +} + +#[derive(Deserialize)] +struct LanceIndexRequest { + #[serde(default = "default_partitions")] + num_partitions: u32, + #[serde(default = "default_bits")] + num_bits: u32, + #[serde(default = "default_subvectors")] + num_sub_vectors: u32, +} + +fn default_partitions() -> u32 { 316 } // ≈√100K — sane for the reference dataset +fn default_bits() -> u32 { 8 } +fn default_subvectors() -> u32 { 48 } // 768/48 = 16 dims per subvector + +/// Build the IVF_PQ index on the Lance dataset. +async fn lance_build_index( + State(state): State, + Path(index_name): Path, + Json(req): Json, +) -> impl IntoResponse { + let lance_store = state.lance.store_for(&index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + match lance_store.build_index(req.num_partitions, req.num_bits, req.num_sub_vectors).await { + Ok(stats) => Ok(Json(stats)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Deserialize)] +struct LanceSearchRequest { + /// Plain text query — embedded server-side for symmetry with the + /// existing /vectors/search path. + query: String, + #[serde(default = "default_top_k")] + top_k: usize, +} + +fn default_top_k() -> usize { 5 } + +/// Vector search against a Lance dataset. Embeds the query text via the +/// sidecar then calls Lance's nearest-neighbor scanner. +async fn lance_search( + State(state): State, + Path(index_name): Path, + Json(req): Json, +) -> impl IntoResponse { + let embed_resp = state.ai_client + .embed(EmbedRequest { texts: vec![req.query.clone()], model: None }) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?; + if embed_resp.embeddings.is_empty() { + return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into())); + } + let qv: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); + + let lance_store = state.lance.store_for(&index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + + let t0 = std::time::Instant::now(); + let hits = lance_store.search(&qv, req.top_k).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + Ok(Json(serde_json::json!({ + "index_name": index_name, + "query": req.query, + "method": "lance_ivf_pq", + "latency_us": t0.elapsed().as_micros() as u64, + "results": hits, + }))) +} + +/// Random-access fetch by doc_id — the O(1) lookup that's basically +/// impossible in our Parquet path without scanning the whole file. +async fn lance_get_doc( + State(state): State, + Path((index_name, doc_id)): Path<(String, String)>, +) -> impl IntoResponse { + let lance_store = state.lance.store_for(&index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + let t0 = std::time::Instant::now(); + match lance_store.get_by_doc_id(&doc_id).await { + Ok(Some(row)) => Ok(Json(serde_json::json!({ + "index_name": index_name, + "doc_id": doc_id, + "latency_us": t0.elapsed().as_micros() as u64, + "row": row, + }))), + Ok(None) => Err((StatusCode::NOT_FOUND, format!("doc_id not found: {doc_id}"))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Deserialize)] +struct LanceAppendRequest { + /// Optional source tag — set on every appended row. + #[serde(default)] + source: Option, + rows: Vec, +} + +#[derive(Deserialize)] +struct LanceAppendRow { + doc_id: String, + #[serde(default)] + chunk_idx: Option, + chunk_text: String, + /// Pre-computed embedding. Caller is responsible for ensuring it + /// matches the dataset's dimensions and embedding model. + vector: Vec, +} + +async fn lance_append( + State(state): State, + Path(index_name): Path, + Json(req): Json, +) -> impl IntoResponse { + if req.rows.is_empty() { + return Err((StatusCode::BAD_REQUEST, "rows array is empty".into())); + } + let lance_store = state.lance.store_for(&index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + + let mut doc_ids = Vec::with_capacity(req.rows.len()); + let mut chunk_idxs = Vec::with_capacity(req.rows.len()); + let mut chunk_texts = Vec::with_capacity(req.rows.len()); + let mut vectors = Vec::with_capacity(req.rows.len()); + for r in req.rows { + doc_ids.push(r.doc_id); + chunk_idxs.push(r.chunk_idx.unwrap_or(0)); + chunk_texts.push(r.chunk_text); + vectors.push(r.vector); + } + + match lance_store.append(req.source, doc_ids, chunk_idxs, chunk_texts, vectors).await { + Ok(stats) => Ok(Json(stats)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +async fn lance_stats( + State(state): State, + Path(index_name): Path, +) -> impl IntoResponse { + let lance_store = state.lance.store_for(&index_name).await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + match lance_store.stats().await { + Ok(s) => Ok(Json(s)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/crates/vectord/src/trial.rs b/crates/vectord/src/trial.rs index 6d601c7..75add9a 100644 --- a/crates/vectord/src/trial.rs +++ b/crates/vectord/src/trial.rs @@ -17,6 +17,9 @@ use std::sync::Arc; use tokio::sync::RwLock; use storaged::append_log::AppendLog; +use storaged::registry::BucketRegistry; + +use crate::index_registry::IndexRegistry; /// HNSW build/search parameters the agent can tune. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -86,18 +89,28 @@ impl Trial { } /// Per-index append log, lazy-created on first write. +/// +/// Federation layer 2: the journal resolves each index's bucket from the +/// index registry and writes its JSONL batches to THAT bucket, not +/// primary. Back-compat is preserved by `IndexMeta::bucket` defaulting +/// to "primary" for pre-federation indexes. Indexes the registry has +/// never heard of (edge case — trials run before first register) fall +/// through to primary as well. #[derive(Clone)] pub struct TrialJournal { - store: Arc, - /// Cache per-index AppendLog instances so the in-memory buffer persists - /// across calls. - logs: Arc>>>, + buckets: Arc, + index_registry: IndexRegistry, + /// Cache per (bucket, index) AppendLog so the in-memory buffer persists + /// across calls. Keyed by `(bucket, index_name)` so moving an index + /// between buckets is clean — the old journal stays intact. + logs: Arc>>>, } impl TrialJournal { - pub fn new(store: Arc) -> Self { + pub fn new(buckets: Arc, index_registry: IndexRegistry) -> Self { Self { - store, + buckets, + index_registry, logs: Arc::new(RwLock::new(HashMap::new())), } } @@ -106,35 +119,49 @@ impl TrialJournal { format!("_hnsw_trials/{}", index_name) } - async fn log_for(&self, index_name: &str) -> Arc { - if let Some(log) = self.logs.read().await.get(index_name) { - return log.clone(); + /// Resolve which bucket holds this index's trial artifacts. + /// Falls back to primary for indexes without recorded metadata. + async fn bucket_for(&self, index_name: &str) -> String { + self.index_registry + .get(index_name) + .await + .map(|m| m.bucket) + .unwrap_or_else(|| "primary".to_string()) + } + + async fn log_for(&self, index_name: &str) -> Result, String> { + let bucket = self.bucket_for(index_name).await; + let key = (bucket.clone(), index_name.to_string()); + + if let Some(log) = self.logs.read().await.get(&key) { + return Ok(log.clone()); } let mut guard = self.logs.write().await; - if let Some(log) = guard.get(index_name) { - return log.clone(); + if let Some(log) = guard.get(&key) { + return Ok(log.clone()); } + let store = self.buckets.get(&bucket)?; // Trials arrive one at a time during human/agent iteration — a low // threshold gives "hit /trials and see my latest attempt" immediacy // without creating one file per event. let log = Arc::new( - AppendLog::new(self.store.clone(), Self::prefix(index_name)) + AppendLog::new(store, Self::prefix(index_name)) .with_flush_threshold(4), ); - guard.insert(index_name.to_string(), log.clone()); - log + guard.insert(key, log.clone()); + Ok(log) } /// Append a trial record. In-memory buffered; persisted in batches. pub async fn append(&self, trial: &Trial) -> Result<(), String> { let line = serde_json::to_vec(trial).map_err(|e| e.to_string())?; - let log = self.log_for(&trial.index_name).await; + let log = self.log_for(&trial.index_name).await?; log.append(line).await } /// Read all trials for an index (flushed batches + unflushed buffer). pub async fn list(&self, index_name: &str) -> Result, String> { - let log = self.log_for(index_name).await; + let log = self.log_for(index_name).await?; let lines = log.read_all().await?; let mut trials = Vec::with_capacity(lines.len()); for line in lines { @@ -149,13 +176,13 @@ impl TrialJournal { /// Explicit flush for callers that want write-through semantics /// (e.g. an agent that wants to commit a trial before querying stats). pub async fn flush(&self, index_name: &str) -> Result<(), String> { - let log = self.log_for(index_name).await; + let log = self.log_for(index_name).await?; log.flush().await } /// Compact all batch files for an index into one. pub async fn compact(&self, index_name: &str) -> Result { - let log = self.log_for(index_name).await; + let log = self.log_for(index_name).await?; log.compact().await } diff --git a/lakehouse.toml b/lakehouse.toml index a642dd4..a569b27 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -47,3 +47,14 @@ enabled = false # Export traces to stdout (set to "otlp" for OpenTelemetry collector) exporter = "stdout" service_name = "lakehouse" + +[agent] +# Phase 16.2 — background autotune agent. Opt-in: set enabled = true to +# let the agent continuously propose + trial HNSW configs and auto-promote +# winners. Defaults are conservative so it stays out of the way of live +# search traffic on shared Ollama. +enabled = true +cycle_interval_secs = 120 # periodic wake if no triggers +cooldown_between_trials_secs = 10 # min gap between trials +min_recall = 0.9 # never promote below this +max_trials_per_hour = 20 # hard budget cap diff --git a/sidecar/sidecar/admin.py b/sidecar/sidecar/admin.py new file mode 100644 index 0000000..4eb235c --- /dev/null +++ b/sidecar/sidecar/admin.py @@ -0,0 +1,137 @@ +"""Admin endpoints: model lifecycle + VRAM introspection. + +Phase 17 / Phase C — the VRAM-aware profile swap story. Ollama loads +models lazily and unloads after a TTL (default 5min). For predictable +swaps between model profiles, we need explicit control: + +- unload: send keep_alive=0 to force immediate unload +- preload: send keep_alive=5m with empty prompt to proactively load +- ps: query Ollama's /api/ps to see what's currently loaded + +All three are thin wrappers over Ollama's own API. No state held here. +""" + +import asyncio +import shutil +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from .ollama import client + +router = APIRouter() + + +class ModelRequest(BaseModel): + model: str + + +@router.post("/unload") +async def unload(req: ModelRequest): + """Force Ollama to unload a model immediately (keep_alive=0 trick).""" + async with client() as c: + resp = await c.post( + "/api/generate", + json={"model": req.model, "prompt": "", "keep_alive": 0, "stream": False}, + ) + # Ollama returns 200 even on unload; anything else is abnormal. + if resp.status_code not in (200,): + raise HTTPException(502, f"Ollama unload error: {resp.text}") + return {"unloaded": req.model} + + +@router.post("/preload") +async def preload(req: ModelRequest): + """Force Ollama to load a model into VRAM and keep it there.""" + async with client() as c: + resp = await c.post( + "/api/generate", + json={ + "model": req.model, + # "" can confuse some models; a single token is safer and still ~instant. + "prompt": " ", + "keep_alive": "5m", + "stream": False, + "options": {"num_predict": 1}, + }, + ) + if resp.status_code != 200: + raise HTTPException(502, f"Ollama preload error: {resp.text}") + data = resp.json() + return { + "preloaded": req.model, + "load_duration_ns": data.get("load_duration"), + "total_duration_ns": data.get("total_duration"), + } + + +@router.get("/ps") +async def list_loaded(): + """What models does Ollama currently have in VRAM?""" + async with client() as c: + resp = await c.get("/api/ps") + if resp.status_code != 200: + raise HTTPException(502, f"Ollama ps error: {resp.text}") + data = resp.json() + # Flatten Ollama's fields into something small + useful. + models = [ + { + "name": m.get("name"), + "size_bytes": m.get("size"), + "size_vram_bytes": m.get("size_vram"), + "expires_at": m.get("expires_at"), + } + for m in data.get("models", []) + ] + return {"models": models} + + +@router.get("/vram") +async def vram_summary(): + """Combined: nvidia-smi VRAM + Ollama loaded models. + + Shells out to nvidia-smi; if it's not on PATH, returns just the + Ollama view. Intentionally async-via-to_thread so the blocking + subprocess doesn't stall the event loop. + """ + gpu = None + if shutil.which("nvidia-smi"): + gpu = await asyncio.to_thread(_nvidia_smi_snapshot) + + async with client() as c: + resp = await c.get("/api/ps") + loaded = resp.json().get("models", []) if resp.status_code == 200 else [] + + return { + "gpu": gpu, + "ollama_loaded": [ + { + "name": m.get("name"), + "size_vram_mib": (m.get("size_vram", 0) or 0) // (1024 * 1024), + "expires_at": m.get("expires_at"), + } + for m in loaded + ], + } + + +def _nvidia_smi_snapshot(): + """One-shot nvidia-smi poll. Returns None on failure.""" + import subprocess + try: + out = subprocess.check_output( + [ + "nvidia-smi", + "--query-gpu=memory.used,memory.total,utilization.gpu,name", + "--format=csv,noheader,nounits", + ], + timeout=2, + ).decode().strip() + used_mib, total_mib, util_pct, name = [s.strip() for s in out.split(",")] + return { + "name": name, + "used_mib": int(used_mib), + "total_mib": int(total_mib), + "utilization_pct": int(util_pct), + } + except Exception: + return None diff --git a/sidecar/sidecar/main.py b/sidecar/sidecar/main.py index 9555de2..56fa4d0 100644 --- a/sidecar/sidecar/main.py +++ b/sidecar/sidecar/main.py @@ -2,6 +2,7 @@ import os from fastapi import FastAPI +from .admin import router as admin_router from .embed import router as embed_router from .generate import router as generate_router from .rerank import router as rerank_router @@ -11,6 +12,7 @@ app = FastAPI(title="Lakehouse AI Sidecar") app.include_router(embed_router, prefix="/embed", tags=["embed"]) app.include_router(generate_router, prefix="/generate", tags=["generate"]) app.include_router(rerank_router, prefix="/rerank", tags=["rerank"]) +app.include_router(admin_router, prefix="/admin", tags=["admin"]) @app.get("/health")