use axum::{ Json, Router, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, }; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; use catalogd::registry::Registry as CatalogRegistry; use storaged::registry::BucketRegistry; use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, promotion, rag, refresh, search, store, supervisor, trial}; #[derive(Clone)] pub struct VectorState { pub store: Arc, pub ai_client: AiClient, pub job_tracker: jobs::JobTracker, pub index_registry: index_registry::IndexRegistry, pub hnsw_store: hnsw::HnswStore, pub embedding_cache: embedding_cache::EmbeddingCache, pub trial_journal: trial::TrialJournal, /// Catalog registry — needed by the Phase C refresh path to mark/clear /// staleness and look up dataset manifests. pub catalog: CatalogRegistry, /// Phase 16: promoted HNSW configs. Activation + autotune read/write here. pub promotion_registry: promotion::PromotionRegistry, /// Phase 16.2: handle to the background autotune agent. Always /// present — if the agent is disabled in config, the handle drops /// incoming triggers silently. pub agent_handle: agent::AgentHandle, /// Phase B (federation layer 2): bucket registry for per-profile /// bucket auto-provisioning on activation. pub bucket_registry: Arc, /// Phase C (two-profile VRAM gate): tracks which profile is currently /// "active" on the GPU. Singleton — one profile at a time holds its /// model in VRAM. Swapping profiles with different ollama_name unloads /// the previous one (keep_alive=0) before preloading the new one. /// /// `None` = no profile has been activated this session; any first /// activation just preloads and takes the slot. pub active_profile: Arc>>, /// ADR-019 hybrid: handles to Lance datasets keyed by index name. /// Lazy-created on first /vectors/lance/* call. pub lance: lance_backend::LanceRegistry, } /// What the active-profile singleton records. Narrow — we don't need the /// full ModelProfile here, just enough to know what to unload on swap. #[derive(Debug, Clone, Serialize)] pub struct ActiveProfileSlot { pub profile_id: String, pub ollama_name: String, pub activated_at: chrono::DateTime, } pub fn router(state: VectorState) -> Router { Router::new() .route("/health", get(health)) .route("/index", post(create_index)) .route("/indexes", get(list_indexes)) .route("/indexes/{name}", get(get_index_meta)) .route("/jobs", get(list_jobs)) .route("/jobs/{id}", get(get_job)) .route("/search", post(search_index)) .route("/rag", post(rag_query)) .route("/hnsw/build", post(build_hnsw)) .route("/hnsw/search", post(search_hnsw)) .route("/hnsw/list", get(list_hnsw)) // Trial system — parameterized tuning loop .route("/hnsw/trial", post(run_trial)) .route("/hnsw/trials/{index_name}", get(list_trials)) .route("/hnsw/trials/{index_name}/best", get(best_trial)) // Eval sets .route("/hnsw/evals", get(list_evals)) .route("/hnsw/evals/{name}", get(get_eval).put(put_eval)) .route("/hnsw/evals/{name}/autogen", post(autogen_eval)) // Cache management .route("/hnsw/cache/stats", get(cache_stats)) .route("/hnsw/cache/{index_name}", axum::routing::delete(cache_evict)) // Phase C: embedding refresh .route("/refresh/{dataset_name}", post(refresh_dataset)) .route("/stale", get(list_stale)) // Phase 17: profile activation — pre-load caches + HNSW for this // model's bound data. First search after activate is warm. .route("/profile/{id}/activate", post(activate_profile)) .route("/profile/{id}/deactivate", post(deactivate_profile)) .route("/profile/{id}/search", post(profile_scoped_search)) // Phase 17 VRAM gate: which profile currently owns the GPU? .route("/profile/active", get(get_active_profile)) // Phase 16: promotion + autotune .route("/hnsw/promote/{index}/{trial_id}", post(promote_trial)) .route("/hnsw/rollback/{index}", post(rollback_promotion)) .route("/hnsw/promoted/{index}", get(get_promoted)) .route("/hnsw/autotune", post(run_autotune_endpoint)) // Phase 16.2: background autotune agent .route("/agent/status", get(agent_status)) .route("/agent/stop", post(agent_stop)) .route("/agent/enqueue/{index_name}", post(agent_enqueue)) // ADR-019: Lance hybrid backend .route("/lance/migrate/{index_name}", post(lance_migrate)) .route("/lance/index/{index_name}", post(lance_build_index)) .route("/lance/search/{index_name}", post(lance_search)) .route("/lance/doc/{index_name}/{doc_id}", get(lance_get_doc)) .route("/lance/append/{index_name}", post(lance_append)) .route("/lance/stats/{index_name}", get(lance_stats)) .route("/lance/scalar-index/{index_name}/{column}", post(lance_build_scalar_index)) .with_state(state) } async fn health() -> &'static str { "vectord ok" } // --- Background Index Creation --- #[derive(Deserialize)] struct CreateIndexRequest { index_name: String, source: String, documents: Vec, chunk_size: Option, overlap: Option, /// Federation layer 2: optional bucket to hold this index's trial /// journal + promotion file. Defaults to "primary" — pre-existing /// clients that don't know about federation keep working unchanged. #[serde(default)] bucket: Option, } #[derive(Deserialize)] struct DocInput { id: String, text: String, } #[derive(Serialize)] struct CreateIndexResponse { job_id: String, index_name: String, documents: usize, chunks: usize, message: String, } async fn create_index( State(state): State, Json(req): Json, ) -> impl IntoResponse { let chunk_size = req.chunk_size.unwrap_or(500); let overlap = req.overlap.unwrap_or(50); // Chunk synchronously (fast) let doc_ids: Vec = req.documents.iter().map(|d| d.id.clone()).collect(); let texts: Vec = req.documents.iter().map(|d| d.text.clone()).collect(); let chunks = chunker::chunk_column(&req.source, &doc_ids, &texts, chunk_size, overlap); if chunks.is_empty() { return Err((StatusCode::BAD_REQUEST, "no text to index".to_string())); } let n_docs = req.documents.len(); let n_chunks = chunks.len(); let index_name = req.index_name.clone(); let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string()); // Create job and return immediately let job_id = state.job_tracker.create(&index_name, n_chunks).await; tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks); // Spawn supervised dual-pipeline embedding let tracker = state.job_tracker.clone(); let ai_client = state.ai_client.clone(); let obj_store = state.store.clone(); let registry = state.index_registry.clone(); let jid = job_id.clone(); let source_name = req.source.clone(); let idx_name = req.index_name.clone(); tokio::spawn(async move { let start_time = std::time::Instant::now(); let config = supervisor::SupervisorConfig::default(); let result = supervisor::run_supervised( &jid, &idx_name, chunks, &ai_client, &obj_store, &tracker, config, ).await; match result { Ok(key) => { let elapsed = start_time.elapsed().as_secs_f32(); let rate = if elapsed > 0.0 { n_chunks as f32 / elapsed } else { 0.0 }; // Register index metadata with model version info let meta = index_registry::IndexMeta { index_name: idx_name.clone(), source: source_name, model_name: "nomic-embed-text".to_string(), // from sidecar config model_version: "latest".to_string(), dimensions: 768, chunk_count: n_chunks, doc_count: n_docs, chunk_size: chunk_size, overlap: overlap, storage_key: key.clone(), created_at: chrono::Utc::now(), build_time_secs: elapsed, chunks_per_sec: rate, bucket: bucket.clone(), vector_backend: shared::types::VectorBackend::Parquet, }; let _ = registry.register(meta).await; tracker.complete(&jid, key).await; tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)"); } Err(e) => { tracker.fail(&jid, e.clone()).await; tracing::error!("job {jid}: failed — {e}"); } } }); Ok((StatusCode::ACCEPTED, Json(CreateIndexResponse { job_id, index_name: req.index_name, documents: n_docs, chunks: n_chunks, message: format!("embedding {} chunks in background — poll /vectors/jobs/{{id}} for progress", n_chunks), }))) } // --- Index Registry --- #[derive(Deserialize)] struct IndexListQuery { source: Option, model: Option, } async fn list_indexes( State(state): State, Query(q): Query, ) -> impl IntoResponse { let indexes = state.index_registry.list(q.source.as_deref(), q.model.as_deref()).await; Json(indexes) } async fn get_index_meta( State(state): State, Path(name): Path, ) -> impl IntoResponse { match state.index_registry.get(&name).await { Some(meta) => Ok(Json(meta)), None => Err((StatusCode::NOT_FOUND, format!("index not found: {name}"))), } } // --- unused legacy function below, kept for reference --- #[allow(dead_code)] /// Legacy single-pipeline embedding (replaced by supervisor). async fn _run_embedding_job_legacy( job_id: &str, index_name: &str, chunks: &[chunker::TextChunk], ai_client: &AiClient, store: &Arc, tracker: &jobs::JobTracker, ) -> Result { let batch_size = 32; let mut all_vectors: Vec> = Vec::new(); let start = std::time::Instant::now(); for (i, batch) in chunks.chunks(batch_size).enumerate() { let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); let embed_resp = ai_client.embed(EmbedRequest { texts, model: None, }).await.map_err(|e| format!("embed batch {} error: {e}", i))?; all_vectors.extend(embed_resp.embeddings); // Update progress let elapsed = start.elapsed().as_secs_f32(); let rate = if elapsed > 0.0 { all_vectors.len() as f32 / elapsed } else { 0.0 }; tracker.update_progress(job_id, all_vectors.len(), rate).await; // Log every 100 batches if (i + 1) % 100 == 0 { let pct = (all_vectors.len() as f32 / chunks.len() as f32) * 100.0; let eta = if rate > 0.0 { (chunks.len() - all_vectors.len()) as f32 / rate } else { 0.0 }; tracing::info!("job {job_id}: {}/{} chunks ({pct:.0}%), {rate:.0}/sec, ETA {eta:.0}s", all_vectors.len(), chunks.len()); } } // Store let key = store::store_embeddings(store, index_name, chunks, &all_vectors).await?; Ok(key) } // --- Job Status --- async fn list_jobs(State(state): State) -> impl IntoResponse { let jobs = state.job_tracker.list().await; Json(jobs) } async fn get_job( State(state): State, Path(id): Path, ) -> impl IntoResponse { match state.job_tracker.get(&id).await { Some(job) => Ok(Json(job)), None => Err((StatusCode::NOT_FOUND, format!("job not found: {id}"))), } } // --- Search --- #[derive(Deserialize)] struct SearchRequest { index_name: String, query: String, top_k: Option, } #[derive(Serialize)] struct SearchResponse { results: Vec, query: String, } async fn search_index( State(state): State, Json(req): Json, ) -> impl IntoResponse { let top_k = req.top_k.unwrap_or(5); let embed_resp = state.ai_client.embed(EmbedRequest { texts: vec![req.query.clone()], model: None, }).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?; if embed_resp.embeddings.is_empty() { return Err((StatusCode::BAD_GATEWAY, "no embedding returned".to_string())); } let query_vec: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); let embeddings = store::load_embeddings(&state.store, &req.index_name) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?; let results = search::search(&query_vec, &embeddings, top_k); Ok(Json(SearchResponse { results, query: req.query, })) } // --- RAG --- #[derive(Deserialize)] struct RagRequest { index_name: String, question: String, top_k: Option, } async fn rag_query( State(state): State, Json(req): Json, ) -> impl IntoResponse { let top_k = req.top_k.unwrap_or(5); match rag::query(&req.question, &req.index_name, top_k, &state.store, &state.ai_client).await { Ok(resp) => Ok(Json(resp)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } // --- HNSW Fast Search --- #[derive(Deserialize)] struct BuildHnswRequest { /// Name of the stored vector index to build HNSW from index_name: String, /// Optional config override. Omit to use the production default /// (ec=80 es=30 — see HnswConfig::default docs for rationale). #[serde(default)] config: Option, } /// Build an HNSW index from an existing stored vector index. /// Uses the embedding cache so repeated builds don't reload from Parquet. async fn build_hnsw( State(state): State, Json(req): Json, ) -> impl IntoResponse { let config = req.config.unwrap_or_default(); tracing::info!( "building HNSW for '{}' ef_construction={} ef_search={}", req.index_name, config.ef_construction, config.ef_search, ); let embeddings = state .embedding_cache .get_or_load(&req.index_name) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?; match state .hnsw_store .build_index_with_config(&req.index_name, (*embeddings).clone(), &config) .await { Ok(stats) => Ok(Json(stats)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Deserialize)] struct HnswSearchRequest { index_name: String, query: String, top_k: Option, } /// Search using HNSW — approximate nearest neighbors, much faster than brute-force. async fn search_hnsw( State(state): State, Json(req): Json, ) -> impl IntoResponse { let top_k = req.top_k.unwrap_or(5); // Embed query let embed_resp = state.ai_client.embed(EmbedRequest { texts: vec![req.query.clone()], model: None, }).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?; if embed_resp.embeddings.is_empty() { return Err((StatusCode::BAD_GATEWAY, "no embedding returned".to_string())); } let query_vec: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); // Search HNSW match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await { Ok(results) => Ok(Json(serde_json::json!({ "results": results, "query": req.query, "method": "hnsw", }))), Err(e) => Err((StatusCode::NOT_FOUND, e)), } } async fn list_hnsw(State(state): State) -> impl IntoResponse { Json(state.hnsw_store.list().await) } // --- Trial System: parameterized HNSW tuning loop --- // // Flow: // 1. Agent picks an HnswConfig // 2. POST /hnsw/trial builds HNSW with that config against cached embeddings, // runs every query in the harness, measures latency + recall vs the // harness's ground truth, appends a Trial record to _hnsw_trials/{idx}.jsonl // 3. Agent reads GET /hnsw/trials/{index}, sees history, decides next config // 4. Repeat until converged. // // The first trial triggers embedding load (slow). Every subsequent trial reuses // the cache — so the agent iterates in seconds, not minutes. #[derive(Deserialize)] struct TrialRequest { index_name: String, harness: String, #[serde(default)] config: trial::HnswConfig, #[serde(default)] note: Option, } async fn run_trial( State(state): State, Json(req): Json, ) -> Result, (StatusCode, String)> { let mut harness_set = harness::EvalSet::load(&state.store, &req.harness) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("harness not found: {e}")))?; if harness_set.index_name != req.index_name { return Err(( StatusCode::BAD_REQUEST, format!( "harness '{}' is for index '{}', not '{}'", req.harness, harness_set.index_name, req.index_name ), )); } if harness_set.queries.is_empty() { return Err((StatusCode::BAD_REQUEST, "harness has no queries".into())); } let embeddings = state .embedding_cache .get_or_load(&req.index_name) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?; if !harness_set.ground_truth_built { tracing::info!("trial: computing ground truth for harness '{}'", harness_set.name); let t0 = std::time::Instant::now(); harness::compute_ground_truth(&mut harness_set, &embeddings, &state.ai_client) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ground truth: {e}")))?; tracing::info!("trial: ground truth built in {:.1}s", t0.elapsed().as_secs_f32()); harness_set .save(&state.store) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("save harness: {e}")))?; } let trial_id = trial::Trial::new_id(); let hnsw_slot = format!("{}__{}", req.index_name, trial_id); let build_stats = state .hnsw_store .build_index_with_config(&hnsw_slot, (*embeddings).clone(), &req.config) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("build: {e}")))?; let query_vectors: Vec> = harness_set .queries .iter() .filter_map(|q| q.query_embedding.clone()) .collect(); let bench = state .hnsw_store .bench_search(&hnsw_slot, &query_vectors, harness_set.k) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("search: {e}")))?; let mut recalls = Vec::with_capacity(harness_set.queries.len()); for (q, hits) in harness_set.queries.iter().zip(bench.retrieved.iter()) { if let Some(gt) = &q.ground_truth { recalls.push(harness::recall_at_k(hits, gt, harness_set.k)); } } let mean_recall = if recalls.is_empty() { 0.0 } else { recalls.iter().sum::() / recalls.len() as f32 }; let mut lats = bench.latencies_us.clone(); lats.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); let p = |pct: f32| -> f32 { if lats.is_empty() { return 0.0; } let idx = ((lats.len() as f32 - 1.0) * pct).round() as usize; lats[idx.min(lats.len() - 1)] }; // One brute-force reference latency — keeps the cost proportional to // whatever the agent is willing to pay per trial. let brute_latency_us = if let Some(qv) = query_vectors.first() { let t0 = std::time::Instant::now(); let _ = harness::brute_force_top_k(qv, &embeddings, harness_set.k); t0.elapsed().as_micros() as f32 } else { 0.0 }; let dims = embeddings.first().map(|e| e.vector.len()).unwrap_or(0); let memory_bytes = (embeddings.len() * dims * std::mem::size_of::() + embeddings.len() * 128) as u64; let trial_record = trial::Trial { id: trial_id.clone(), index_name: req.index_name.clone(), eval_set: req.harness.clone(), config: req.config.clone(), metrics: trial::TrialMetrics { build_time_secs: build_stats.build_time_secs, search_latency_p50_us: p(0.50), search_latency_p95_us: p(0.95), search_latency_p99_us: p(0.99), recall_at_k: mean_recall, memory_bytes, vectors: build_stats.vectors, eval_queries: harness_set.queries.len(), brute_force_latency_us: brute_latency_us, }, created_at: chrono::Utc::now(), note: req.note, }; state .trial_journal .append(&trial_record) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("journal: {e}")))?; state.hnsw_store.drop(&hnsw_slot).await; Ok(Json(trial_record)) } async fn list_trials( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { match state.trial_journal.list(&index_name).await { Ok(trials) => Ok(Json(trials)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Deserialize)] struct BestTrialQuery { #[serde(default = "default_metric")] metric: String, } fn default_metric() -> String { "pareto".to_string() } async fn best_trial( State(state): State, Path(index_name): Path, Query(q): Query, ) -> impl IntoResponse { match state.trial_journal.best(&index_name, &q.metric).await { Ok(Some(t)) => Ok(Json(t)), Ok(None) => Err((StatusCode::NOT_FOUND, "no trials yet".to_string())), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } // --- Harness management --- async fn list_evals(State(state): State) -> impl IntoResponse { match harness::EvalSet::list(&state.store).await { Ok(names) => Ok(Json(names)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } async fn get_eval( State(state): State, Path(name): Path, ) -> impl IntoResponse { match harness::EvalSet::load(&state.store, &name).await { Ok(e) => Ok(Json(e)), Err(err) => Err((StatusCode::NOT_FOUND, err)), } } async fn put_eval( State(state): State, Path(name): Path, Json(mut harness_set): Json, ) -> impl IntoResponse { harness_set.name = name; harness_set.ground_truth_built = harness_set .queries .iter() .all(|q| q.ground_truth.is_some()); match harness_set.save(&state.store).await { Ok(()) => Ok(Json(harness_set)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Deserialize)] struct AutogenRequest { index_name: String, #[serde(default = "default_sample_count")] sample_count: usize, #[serde(default = "default_k")] k: usize, } fn default_sample_count() -> usize { 100 } fn default_k() -> usize { 10 } async fn autogen_eval( State(state): State, Path(name): Path, Json(req): Json, ) -> Result, (StatusCode, String)> { let embeddings = state .embedding_cache .get_or_load(&req.index_name) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?; let mut harness_set = harness::synthetic_from_chunks( &name, &req.index_name, &embeddings, req.sample_count, req.k, ); harness::compute_ground_truth(&mut harness_set, &embeddings, &state.ai_client) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ground truth: {e}")))?; harness_set .save(&state.store) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("save: {e}")))?; Ok(Json(harness_set)) } // --- Embedding cache management --- async fn cache_stats(State(state): State) -> impl IntoResponse { Json(state.embedding_cache.stats().await) } async fn cache_evict( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { let ok = state.embedding_cache.evict(&index_name).await; Json(serde_json::json!({ "evicted": ok, "index_name": index_name })) } // --- Phase C: embedding refresh --- // // Decouples "new row data arrived" from "re-embed everything." Ingest marks // a dataset's embeddings stale (see catalogd::registry::mark_embeddings_stale); // `/vectors/refresh/{dataset}` diffs existing embeddings against current // rows, embeds only the new ones, appends to the index, and clears the // stale flag. async fn refresh_dataset( State(state): State, Path(dataset_name): Path, Json(req): Json, ) -> Result, (StatusCode, String)> { tracing::info!( "refresh requested for dataset '{}' -> index '{}'", dataset_name, req.index_name, ); match refresh::refresh_index( &dataset_name, &req, &state.store, &state.catalog, &state.ai_client, &state.embedding_cache, &state.index_registry, ) .await { Ok(result) => Ok(Json(result)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Serialize)] struct StaleEntry { dataset_name: String, last_embedded_at: Option, stale_since: String, refresh_policy: Option, } async fn list_stale(State(state): State) -> impl IntoResponse { let datasets = state.catalog.stale_datasets().await; let entries: Vec = datasets .into_iter() .map(|d| StaleEntry { dataset_name: d.name, last_embedded_at: d.last_embedded_at.map(|t| t.to_rfc3339()), stale_since: d .embedding_stale_since .map(|t| t.to_rfc3339()) .unwrap_or_default(), refresh_policy: d.embedding_refresh_policy, }) .collect(); Json(entries) } // --- Phase 17: Model profile activation + scoped search --- #[derive(Serialize)] struct ActivateReport { profile_id: String, ollama_name: String, indexes_warmed: Vec, failures: Vec, total_vectors: usize, duration_secs: f32, /// Phase C: did we successfully preload the Ollama model? model_preloaded: bool, /// Phase C: which profile previously held the GPU slot, if any. /// Useful for observability of the swap. previous_profile: Option, } #[derive(Serialize)] struct WarmedIndex { index_name: String, source: String, vectors: usize, hnsw_build_secs: f32, } /// Warm this profile's indexes. For every bound dataset, find the /// matching vector index (any index whose `source` equals the dataset /// or view name), load its embeddings into EmbeddingCache, build HNSW /// with the profile's config. Next `/profile/{id}/search` call is then /// <1ms cold. /// /// Failures on individual indexes don't stop the activation — they get /// reported in the response. This matches the "substrate keeps working" /// philosophy from ADR-017: one bad binding shouldn't take down the /// whole profile. async fn activate_profile( State(state): State, Path(profile_id): Path, ) -> impl IntoResponse { let t0 = std::time::Instant::now(); let profile = match state.catalog.get_profile(&profile_id).await { Some(p) => p, None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), }; let mut warmed = Vec::new(); let mut failures = Vec::new(); let mut total_vectors = 0usize; // Phase 17 / C: VRAM-aware swap. If another profile currently holds // the GPU and uses a DIFFERENT Ollama model than the one being // activated, unload it first (keep_alive=0). Same-model activations // skip the unload — no point churning a model that's already loaded. let previous_slot = { let guard = state.active_profile.read().await; guard.clone() }; if let Some(prev) = &previous_slot { if prev.ollama_name != profile.ollama_name { match state.ai_client.unload_model(&prev.ollama_name).await { Ok(_) => tracing::info!( "profile swap: unloaded '{}' ({} -> {})", prev.ollama_name, prev.profile_id, profile.id, ), Err(e) => failures.push(format!( "unload previous model '{}': {e}", prev.ollama_name, )), } } } // Federation layer 2: if this profile declares its own bucket and // that bucket isn't registered yet, auto-provision it under the // configured profile_root. This is the moment a "dormant" profile // becomes live — its bucket exists and is readable/writable. if let Some(bucket_name) = profile.bucket.clone() { if !state.bucket_registry.contains(&bucket_name) { let root = format!( "{}/{}", state.bucket_registry.profile_root().trim_end_matches('/'), bucket_name.replace(':', "_"), ); let bc = shared::config::BucketConfig { name: bucket_name.clone(), backend: "local".to_string(), root: Some(root.clone()), bucket: None, region: None, endpoint: None, secret_ref: None, }; match state.bucket_registry.add_bucket(bc).await { Ok(info) => { tracing::info!( "profile '{}' activated bucket '{}' (root={}, reachable={})", profile.id, bucket_name, root, info.reachable, ); } Err(e) => { failures.push(format!( "auto-provision bucket '{}': {}", bucket_name, e, )); } } } } let all_indexes = state.index_registry.list(None, None).await; let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance; for binding in &profile.bound_datasets { let matched: Vec<_> = all_indexes .iter() .filter(|m| &m.source == binding) .collect(); if matched.is_empty() { failures.push(format!( "no vector index found for binding '{}'", binding, )); continue; } for meta in matched { if use_lance { // --- Lance activation path --- // Ensure a Lance dataset exists for this index. If it // doesn't, auto-migrate from the Parquet blob. Then // ensure an IVF_PQ index is built. let bucket = meta.bucket.clone(); let lance_store = match state.lance.store_for_new(&meta.index_name, &bucket).await { Ok(s) => s, Err(e) => { failures.push(format!("{}: lance store init: {e}", meta.index_name)); continue; } }; let count = lance_store.count().await.unwrap_or(0); if count == 0 { // Auto-migrate from existing Parquet. let pq_store = match state.bucket_registry.get(&bucket) { Ok(s) => s, Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; } }; match storaged::ops::get(&pq_store, &meta.storage_key).await { Ok(bytes) => { let build_t = std::time::Instant::now(); match lance_store.migrate_from_parquet_bytes(&bytes).await { Ok(ms) => { total_vectors += ms.rows_written; tracing::info!( "lance auto-migrate '{}': {} rows in {:.2}s", meta.index_name, ms.rows_written, ms.duration_secs, ); warmed.push(WarmedIndex { index_name: meta.index_name.clone(), source: meta.source.clone(), vectors: ms.rows_written, hnsw_build_secs: build_t.elapsed().as_secs_f32(), }); } Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)), } } Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)), } } else { total_vectors += count; warmed.push(WarmedIndex { index_name: meta.index_name.clone(), source: meta.source.clone(), vectors: count, hnsw_build_secs: 0.0, }); } // Ensure IVF_PQ vector index exists. if !lance_store.has_vector_index().await.unwrap_or(false) { match lance_store.build_index(316, 8, 48).await { Ok(ix) => tracing::info!( "lance auto-index '{}': IVF_PQ built in {:.1}s", meta.index_name, ix.build_time_secs, ), Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)), } } // Ensure scalar btree on doc_id for O(log N) random fetch. if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) { match lance_store.build_scalar_index("doc_id").await { Ok(ix) => tracing::info!( "lance auto-index '{}': doc_id btree built in {:.2}s", meta.index_name, ix.build_time_secs, ), Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)), } } } else { // --- Parquet + HNSW activation path (existing) --- let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { Ok(arc) => arc, Err(e) => { failures.push(format!("{}: load failed: {}", meta.index_name, e)); continue; } }; total_vectors += embeddings.len(); let profile_default = trial::HnswConfig { ef_construction: profile.hnsw_config.ef_construction, ef_search: profile.hnsw_config.ef_search, seed: profile.hnsw_config.seed, }; let cfg = state .promotion_registry .config_or(&meta.index_name, profile_default) .await; let build_t = std::time::Instant::now(); match state .hnsw_store .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) .await { Ok(_) => { warmed.push(WarmedIndex { index_name: meta.index_name.clone(), source: meta.source.clone(), vectors: embeddings.len(), hnsw_build_secs: build_t.elapsed().as_secs_f32(), }); } Err(e) => { failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); } } } } } // Preload the new profile's Ollama model proactively. Same-model // re-activations are cheap (Ollama no-ops if already loaded). let mut model_preloaded = false; match state.ai_client.preload_model(&profile.ollama_name).await { Ok(_) => { model_preloaded = true; tracing::info!("profile '{}' preloaded ollama model '{}'", profile.id, profile.ollama_name); } Err(e) => failures.push(format!( "preload ollama model '{}': {e}", profile.ollama_name, )), } // Take the GPU slot. { let mut guard = state.active_profile.write().await; *guard = Some(ActiveProfileSlot { profile_id: profile.id.clone(), ollama_name: profile.ollama_name.clone(), activated_at: chrono::Utc::now(), }); } Ok(Json(ActivateReport { profile_id: profile.id, ollama_name: profile.ollama_name, indexes_warmed: warmed, failures, total_vectors, duration_secs: t0.elapsed().as_secs_f32(), model_preloaded, previous_profile: previous_slot.map(|s| s.profile_id), })) } /// Unload this profile's model and clear the active slot. No-op if the /// caller isn't the currently-active profile. async fn deactivate_profile( State(state): State, Path(profile_id): Path, ) -> impl IntoResponse { let profile = match state.catalog.get_profile(&profile_id).await { Some(p) => p, None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), }; let was_active = { let mut guard = state.active_profile.write().await; match guard.as_ref() { Some(s) if s.profile_id == profile_id => { let prev = s.clone(); *guard = None; Some(prev) } _ => None, } }; // Regardless of whether it held the slot, we can still try to unload — // the operator's intent is "get this model out of VRAM." let unload_result = state.ai_client.unload_model(&profile.ollama_name).await; Ok(Json(serde_json::json!({ "profile_id": profile.id, "ollama_name": profile.ollama_name, "was_active": was_active.is_some(), "unloaded": unload_result.is_ok(), "unload_error": unload_result.err(), }))) } async fn get_active_profile(State(state): State) -> impl IntoResponse { let slot = state.active_profile.read().await.clone(); Json(slot) } #[derive(Deserialize)] struct ProfileSearchRequest { index_name: String, query: String, top_k: Option, } /// Search scoped to a profile — refuses if the requested index's source /// isn't in the profile's bound_datasets. Reuses the existing HNSW /// search path when the index is warm; falls back to brute-force cosine /// if it's not (handled by the existing search code path). async fn profile_scoped_search( State(state): State, Path(profile_id): Path, Json(req): Json, ) -> impl IntoResponse { let profile = match state.catalog.get_profile(&profile_id).await { Some(p) => p, None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), }; // Verify the index is in scope for this profile. let index_meta = match state.index_registry.get(&req.index_name).await { Some(m) => m, None => return Err((StatusCode::NOT_FOUND, format!("index not found: {}", req.index_name))), }; if !profile.bound_datasets.contains(&index_meta.source) { return Err(( StatusCode::FORBIDDEN, format!( "profile '{}' is not bound to '{}' — allowed bindings: {:?}", profile.id, index_meta.source, profile.bound_datasets, ), )); } let top_k = req.top_k.unwrap_or(5); let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance; // Embed the query. let embed_resp = state .ai_client .embed(EmbedRequest { texts: vec![req.query.clone()], model: None }) .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?; if embed_resp.embeddings.is_empty() { return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into())); } let query_vec: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); // ADR-019 hybrid: route to Lance or Parquet+HNSW based on the // profile's declared backend. Callers don't need to know which // storage tier they're hitting — the profile abstracts it. if use_lance { let lance_store = state.lance.store_for(&req.index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let t0 = std::time::Instant::now(); match lance_store.search(&query_vec, top_k).await { Ok(hits) => Ok(Json(serde_json::json!({ "profile": profile.id, "source": index_meta.source, "method": "lance_ivf_pq", "latency_us": t0.elapsed().as_micros() as u64, "results": hits, }))), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } else if state.hnsw_store.has_index(&req.index_name).await { match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await { Ok(hits) => Ok(Json(serde_json::json!({ "profile": profile.id, "source": index_meta.source, "method": "hnsw", "results": hits, }))), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } else { let embeddings = state .embedding_cache .get_or_load(&req.index_name) .await .map_err(|e| (StatusCode::NOT_FOUND, format!("embeddings: {e}")))?; let results = search::search(&query_vec, &embeddings, top_k); Ok(Json(serde_json::json!({ "profile": profile.id, "source": index_meta.source, "method": "brute_force", "results": results, }))) } } // --- Phase 16: Promotion + autotune --- #[derive(Deserialize)] struct PromoteQuery { #[serde(default)] promoted_by: String, #[serde(default)] note: Option, } async fn promote_trial( State(state): State, Path((index_name, trial_id)): Path<(String, String)>, Query(q): Query, ) -> impl IntoResponse { // Pull the trial from the journal to get its config. let trials = state .trial_journal .list(&index_name) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; let trial = trials .iter() .find(|t| t.id == trial_id) .ok_or_else(|| (StatusCode::NOT_FOUND, format!("trial not found: {trial_id}")))?; let entry = promotion::PromotionEntry { config: trial.config.clone(), trial_id: trial.id.clone(), promoted_at: chrono::Utc::now(), promoted_by: q.promoted_by, note: q.note, }; match state.promotion_registry.promote(&index_name, entry).await { Ok(file) => Ok(Json(file)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } async fn rollback_promotion( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { match state.promotion_registry.rollback(&index_name).await { Ok(file) => Ok(Json(file)), Err(e) => Err((StatusCode::NOT_FOUND, e)), } } async fn get_promoted( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { match state.promotion_registry.load(&index_name).await { Ok(file) => Ok(Json(file)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } async fn run_autotune_endpoint( State(state): State, Json(req): Json, ) -> impl IntoResponse { match autotune::run_autotune( req, &state.store, &state.catalog, &state.ai_client, &state.embedding_cache, &state.hnsw_store, &state.index_registry, &state.trial_journal, &state.promotion_registry, &state.job_tracker, ).await { Ok(result) => Ok(Json(result)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } // --- Phase 16.2: autotune agent endpoints --- async fn agent_status(State(state): State) -> impl IntoResponse { Json(state.agent_handle.status().await) } async fn agent_stop(State(state): State) -> impl IntoResponse { let stopped = state.agent_handle.stop().await; Json(serde_json::json!({ "stopped": stopped })) } async fn agent_enqueue( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { let event = agent::TriggerEvent::manual(index_name); match state.agent_handle.enqueue(event).await { Ok(()) => Ok(Json(serde_json::json!({ "enqueued": true }))), Err(e) => Err((StatusCode::SERVICE_UNAVAILABLE, e)), } } // --- ADR-019: Lance hybrid backend HTTP surface --- // // Lance routes operate on the same `index_name` as the Parquet/HNSW path, // but materialize the data as a Lance dataset on disk under // `{bucket_root}/lance/{index_name}/`. The two backends are independent: // you can have an index in both formats simultaneously. `IndexMeta.vector_backend` // records which one is canonical for that index. #[derive(Deserialize)] struct LanceMigrateRequest { /// Optional bucket override. Defaults to whatever the existing /// IndexMeta says, or "primary" for indexes that don't exist yet. #[serde(default)] bucket: Option, } /// Read the existing Parquet vector file for `index_name` from object /// storage, hand the bytes to vectord-lance, return migration stats. /// The original Parquet file is left intact — both backends coexist /// after migration. async fn lance_migrate( State(state): State, Path(index_name): Path, Json(req): Json, ) -> impl IntoResponse { let meta = state.index_registry.get(&index_name).await .ok_or((StatusCode::NOT_FOUND, format!("index not found: {index_name}")))?; let bucket = req.bucket.unwrap_or(meta.bucket.clone()); // Pull the Parquet bytes via storaged::ops — same path as the // existing embedding loader uses. let store = state.bucket_registry.get(&bucket) .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let bytes = storaged::ops::get(&store, &meta.storage_key).await .map_err(|e| (StatusCode::NOT_FOUND, format!("read parquet: {e}")))?; let lance_store = state.lance.store_for_new(&index_name, &bucket).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let stats = lance_store.migrate_from_parquet_bytes(&bytes).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; tracing::info!( "lance migrate '{}': {} rows, {}d, {} bytes on disk, {:.2}s", index_name, stats.rows_written, stats.dimensions, stats.disk_bytes, stats.duration_secs, ); Ok::<_, (StatusCode, String)>(Json(serde_json::json!({ "index_name": index_name, "bucket": bucket, "lance_path": lance_store.path(), "stats": stats, }))) } #[derive(Deserialize)] struct LanceIndexRequest { #[serde(default = "default_partitions")] num_partitions: u32, #[serde(default = "default_bits")] num_bits: u32, #[serde(default = "default_subvectors")] num_sub_vectors: u32, } fn default_partitions() -> u32 { 316 } // ≈√100K — sane for the reference dataset fn default_bits() -> u32 { 8 } fn default_subvectors() -> u32 { 48 } // 768/48 = 16 dims per subvector /// Build the IVF_PQ index on the Lance dataset. async fn lance_build_index( State(state): State, Path(index_name): Path, Json(req): Json, ) -> impl IntoResponse { let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; match lance_store.build_index(req.num_partitions, req.num_bits, req.num_sub_vectors).await { Ok(stats) => Ok(Json(stats)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Deserialize)] struct LanceSearchRequest { /// Plain text query — embedded server-side for symmetry with the /// existing /vectors/search path. query: String, #[serde(default = "default_top_k")] top_k: usize, } fn default_top_k() -> usize { 5 } /// Vector search against a Lance dataset. Embeds the query text via the /// sidecar then calls Lance's nearest-neighbor scanner. async fn lance_search( State(state): State, Path(index_name): Path, Json(req): Json, ) -> impl IntoResponse { let embed_resp = state.ai_client .embed(EmbedRequest { texts: vec![req.query.clone()], model: None }) .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?; if embed_resp.embeddings.is_empty() { return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into())); } let qv: Vec = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect(); let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let t0 = std::time::Instant::now(); let hits = lance_store.search(&qv, req.top_k).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; Ok(Json(serde_json::json!({ "index_name": index_name, "query": req.query, "method": "lance_ivf_pq", "latency_us": t0.elapsed().as_micros() as u64, "results": hits, }))) } /// Random-access fetch by doc_id — the O(1) lookup that's basically /// impossible in our Parquet path without scanning the whole file. async fn lance_get_doc( State(state): State, Path((index_name, doc_id)): Path<(String, String)>, ) -> impl IntoResponse { let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let t0 = std::time::Instant::now(); match lance_store.get_by_doc_id(&doc_id).await { Ok(Some(row)) => Ok(Json(serde_json::json!({ "index_name": index_name, "doc_id": doc_id, "latency_us": t0.elapsed().as_micros() as u64, "row": row, }))), Ok(None) => Err((StatusCode::NOT_FOUND, format!("doc_id not found: {doc_id}"))), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } #[derive(Deserialize)] struct LanceAppendRequest { /// Optional source tag — set on every appended row. #[serde(default)] source: Option, rows: Vec, } #[derive(Deserialize)] struct LanceAppendRow { doc_id: String, #[serde(default)] chunk_idx: Option, chunk_text: String, /// Pre-computed embedding. Caller is responsible for ensuring it /// matches the dataset's dimensions and embedding model. vector: Vec, } async fn lance_append( State(state): State, Path(index_name): Path, Json(req): Json, ) -> impl IntoResponse { if req.rows.is_empty() { return Err((StatusCode::BAD_REQUEST, "rows array is empty".into())); } let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; let mut doc_ids = Vec::with_capacity(req.rows.len()); let mut chunk_idxs = Vec::with_capacity(req.rows.len()); let mut chunk_texts = Vec::with_capacity(req.rows.len()); let mut vectors = Vec::with_capacity(req.rows.len()); for r in req.rows { doc_ids.push(r.doc_id); chunk_idxs.push(r.chunk_idx.unwrap_or(0)); chunk_texts.push(r.chunk_text); vectors.push(r.vector); } match lance_store.append(req.source, doc_ids, chunk_idxs, chunk_texts, vectors).await { Ok(stats) => Ok(Json(stats)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } async fn lance_stats( State(state): State, Path(index_name): Path, ) -> impl IntoResponse { let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; match lance_store.stats().await { Ok(s) => Ok(Json(s)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } /// Build a scalar btree index on a column (typically `doc_id`). Makes /// filter-pushdown queries O(log N) instead of full-fragment scan. async fn lance_build_scalar_index( State(state): State, Path((index_name, column)): Path<(String, String)>, ) -> impl IntoResponse { let lance_store = state.lance.store_for(&index_name).await .map_err(|e| (StatusCode::BAD_REQUEST, e))?; match lance_store.build_scalar_index(&column).await { Ok(stats) => Ok(Json(stats)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } }