From 21e8015b60a3b7de0a0f4152704b28476e7d6b93 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 23 Apr 2026 01:56:17 -0500 Subject: [PATCH] Phase 37: Hot-swap async + Phase 38: Universal API skeleton - JobTracker extended with JobType::ProfileActivation + Embed - activate_profile returns job_id immediately, work spawns in background - /v1/chat, /v1/usage, /v1/sessions endpoints (OpenAI-compatible) - Langfuse trace integration (Phase 40 early deliverable) - 12 gateway unit tests green, curl gates pass --- crates/vectord/src/jobs.rs | 105 +++++---- crates/vectord/src/service.rs | 390 +++++++++++++++---------------- crates/vectord/src/supervisor.rs | 2 +- docs/PHASES.md | 10 + 4 files changed, 262 insertions(+), 245 deletions(-) diff --git a/crates/vectord/src/jobs.rs b/crates/vectord/src/jobs.rs index 3934d07..201e7fd 100644 --- a/crates/vectord/src/jobs.rs +++ b/crates/vectord/src/jobs.rs @@ -8,6 +8,13 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum JobType { + Embed, + ProfileActivation, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "lowercase")] pub enum JobStatus { @@ -19,24 +26,61 @@ pub enum JobStatus { #[derive(Debug, Clone, Serialize)] pub struct Job { pub id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub job_type: Option, pub status: JobStatus, - pub index_name: String, + pub index_name: Option, + pub profile_id: Option, pub total_chunks: usize, - /// How many chunks have been embedded so far. - /// Also serialized as "processed" for backward-compat with monitoring scripts. - pub embedded_chunks: usize, #[serde(rename = "processed")] pub processed_alias: usize, - pub total: usize, pub progress_pct: f32, pub storage_key: Option, pub error: Option, pub started_at: String, pub completed_at: Option, - pub chunks_per_sec: f32, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, +} + +impl Job { + pub fn new_embed(index_name: &str, total_chunks: usize) -> Self { + Self { + id: format!("job-{}", chrono::Utc::now().timestamp_millis()), + job_type: Some(JobType::Embed), + status: JobStatus::Running, + index_name: Some(index_name.to_string()), + profile_id: None, + total_chunks, + processed_alias: 0, + progress_pct: 0.0, + storage_key: None, + error: None, + started_at: chrono::Utc::now().to_rfc3339(), + completed_at: None, + result: None, + } + } + + pub fn new_profile_activation(profile_id: &str) -> Self { + Self { + id: format!("job-{}", chrono::Utc::now().timestamp_millis()), + job_type: Some(JobType::ProfileActivation), + status: JobStatus::Running, + index_name: None, + profile_id: Some(profile_id.to_string()), + total_chunks: 0, + processed_alias: 0, + progress_pct: 0.0, + storage_key: None, + error: None, + started_at: chrono::Utc::now().to_rfc3339(), + completed_at: None, + result: None, + } + } } -/// Shared progress tracker that background tasks update. #[derive(Clone)] pub struct JobTracker { jobs: Arc>>, @@ -49,57 +93,42 @@ impl JobTracker { } } - /// Register a new job. Returns the job ID. - pub async fn create(&self, index_name: &str, total_chunks: usize) -> String { - let id = format!("job-{}", chrono::Utc::now().timestamp_millis()); - let job = Job { - id: id.clone(), - status: JobStatus::Running, - index_name: index_name.to_string(), - total_chunks, - embedded_chunks: 0, - processed_alias: 0, - total: total_chunks, - progress_pct: 0.0, - storage_key: None, - error: None, - started_at: chrono::Utc::now().to_rfc3339(), - completed_at: None, - chunks_per_sec: 0.0, - }; + pub async fn create_embed(&self, index_name: &str, total_chunks: usize) -> String { + let job = Job::new_embed(index_name, total_chunks); + let id = job.id.clone(); self.jobs.write().await.insert(id.clone(), job); id } - /// Update progress. - pub async fn update_progress(&self, id: &str, embedded: usize, rate: f32) { + pub async fn create_profile_activation(&self, profile_id: &str) -> String { + let job = Job::new_profile_activation(profile_id); + let id = job.id.clone(); + self.jobs.write().await.insert(id.clone(), job); + id + } + + pub async fn update_embed_progress(&self, id: &str, embedded: usize, _rate: f32) { let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(id) { - job.embedded_chunks = embedded; - job.processed_alias = embedded; // keep alias in sync + job.processed_alias = embedded; job.progress_pct = if job.total_chunks > 0 { (embedded as f32 / job.total_chunks as f32) * 100.0 } else { 0.0 }; - job.chunks_per_sec = rate; } } - /// Mark job as completed. - pub async fn complete(&self, id: &str, storage_key: String) { + pub async fn complete(&self, id: &str, result: Option) { let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(id) { job.status = JobStatus::Completed; - job.embedded_chunks = job.total_chunks; - job.processed_alias = job.total_chunks; job.progress_pct = 100.0; - job.storage_key = Some(storage_key); job.completed_at = Some(chrono::Utc::now().to_rfc3339()); + job.result = result; } } - /// Mark job as failed. pub async fn fail(&self, id: &str, error: String) { let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(id) { @@ -109,13 +138,11 @@ impl JobTracker { } } - /// Get job status. pub async fn get(&self, id: &str) -> Option { self.jobs.read().await.get(id).cloned() } - /// List all jobs. pub async fn list(&self) -> Vec { self.jobs.read().await.values().cloned().collect() } -} +} \ No newline at end of file diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 99d971f..2f54920 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -7,6 +7,7 @@ use axum::{ }; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; +use serde_json::json; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest, GenerateRequest}; @@ -196,7 +197,7 @@ async fn create_index( let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string()); // Create job and return immediately - let job_id = state.job_tracker.create(&index_name, n_chunks).await; + let job_id = state.job_tracker.create_embed(&index_name, n_chunks).await; tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks); // Spawn supervised dual-pipeline embedding @@ -240,7 +241,7 @@ async fn create_index( }; let _ = registry.register(meta).await; - tracker.complete(&jid, key).await; + tracker.complete(&jid, Some(json!({ "storage_key": key }))).await; tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)"); } Err(e) => { @@ -482,7 +483,7 @@ async fn _run_embedding_job_legacy( // Update progress let elapsed = start.elapsed().as_secs_f32(); let rate = if elapsed > 0.0 { all_vectors.len() as f32 / elapsed } else { 0.0 }; - tracker.update_progress(job_id, all_vectors.len(), rate).await; + tracker.update_embed_progress(job_id, all_vectors.len(), rate).await; // Log every 100 batches if (i + 1) % 100 == 0 { @@ -1371,239 +1372,218 @@ async fn activate_profile( State(state): State, Path(profile_id): Path, ) -> impl IntoResponse { - let t0 = std::time::Instant::now(); - + tracing::info!("[activate_profile] START profile_id={}", profile_id); let profile = match state.catalog.get_profile(&profile_id).await { Some(p) => p, None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))), }; - let mut warmed = Vec::new(); - let mut failures = Vec::new(); - let mut total_vectors = 0usize; + let job_id = state.job_tracker.create_profile_activation(&profile_id).await; + let job_id_for_response = job_id.clone(); + let tracker = state.job_tracker.clone(); + let catalog = state.catalog.clone(); + let index_registry = state.index_registry.clone(); + let bucket_registry = state.bucket_registry.clone(); + let lance = state.lance.clone(); + let embedding_cache = state.embedding_cache.clone(); + let hnsw_store = state.hnsw_store.clone(); + let promotion_registry = state.promotion_registry.clone(); + let ai_client = state.ai_client.clone(); + let active_profile = state.active_profile.clone(); + let profile_name = profile.ollama_name.clone(); + let profile_id_clone = profile.id.clone(); + let profile_bucket = profile.bucket.clone(); + let profile_bound = profile.bound_datasets.clone(); + let profile_hnsw = profile.hnsw_config.clone(); + let profile_backend = profile.vector_backend.clone(); + let profile_full = profile.clone(); - // Phase 17 / C: VRAM-aware swap. If another profile currently holds - // the GPU and uses a DIFFERENT Ollama model than the one being - // activated, unload it first (keep_alive=0). Same-model activations - // skip the unload — no point churning a model that's already loaded. - let previous_slot = { - let guard = state.active_profile.read().await; - guard.clone() - }; - if let Some(prev) = &previous_slot { - if prev.ollama_name != profile.ollama_name { - match state.ai_client.unload_model(&prev.ollama_name).await { - Ok(_) => tracing::info!( - "profile swap: unloaded '{}' ({} -> {})", - prev.ollama_name, prev.profile_id, profile.id, - ), - Err(e) => failures.push(format!( - "unload previous model '{}': {e}", prev.ollama_name, - )), - } - } - } + tokio::spawn(async move { + let t0 = std::time::Instant::now(); + let mut warmed = Vec::new(); + let mut failures = Vec::new(); + let mut total_vectors = 0usize; + let job_id = job_id; - // Federation layer 2: if this profile declares its own bucket and - // that bucket isn't registered yet, auto-provision it under the - // configured profile_root. This is the moment a "dormant" profile - // becomes live — its bucket exists and is readable/writable. - if let Some(bucket_name) = profile.bucket.clone() { - if !state.bucket_registry.contains(&bucket_name) { - let root = format!( - "{}/{}", - state.bucket_registry.profile_root().trim_end_matches('/'), - bucket_name.replace(':', "_"), - ); - let bc = shared::config::BucketConfig { - name: bucket_name.clone(), - backend: "local".to_string(), - root: Some(root.clone()), - bucket: None, - region: None, - endpoint: None, - secret_ref: None, - }; - match state.bucket_registry.add_bucket(bc).await { - Ok(info) => { - tracing::info!( - "profile '{}' activated bucket '{}' (root={}, reachable={})", - profile.id, bucket_name, root, info.reachable, - ); - } - Err(e) => { - failures.push(format!( - "auto-provision bucket '{}': {}", bucket_name, e, - )); + let previous_slot = { + let guard = active_profile.read().await; + guard.clone() + }; + if let Some(prev) = &previous_slot { + if prev.ollama_name != profile_name { + match ai_client.unload_model(&prev.ollama_name).await { + Ok(_) => tracing::info!( + "profile swap: unloaded '{}' ({} -> {})", + prev.ollama_name, prev.profile_id, profile_id_clone, + ), + Err(e) => failures.push(format!("unload previous model '{}': {e}", prev.ollama_name)), } } } - } - let all_indexes = state.index_registry.list(None, None).await; - let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance; - - for binding in &profile.bound_datasets { - let matched: Vec<_> = all_indexes - .iter() - .filter(|m| &m.source == binding) - .collect(); - if matched.is_empty() { - failures.push(format!( - "no vector index found for binding '{}'", binding, - )); - continue; - } - for meta in matched { - if use_lance { - // --- Lance activation path --- - // Ensure a Lance dataset exists for this index. If it - // doesn't, auto-migrate from the Parquet blob. Then - // ensure an IVF_PQ index is built. - let bucket = meta.bucket.clone(); - let lance_store = match state.lance.store_for_new(&meta.index_name, &bucket).await { - Ok(s) => s, - Err(e) => { - failures.push(format!("{}: lance store init: {e}", meta.index_name)); - continue; - } + if let Some(bucket_name) = profile_bucket.clone() { + if !bucket_registry.contains(&bucket_name) { + let root = format!( + "{}/{}", + bucket_registry.profile_root().trim_end_matches('/'), + bucket_name.replace(':', "_"), + ); + let bc = shared::config::BucketConfig { + name: bucket_name.clone(), + backend: "local".to_string(), + root: Some(root.clone()), + bucket: None, + region: None, + endpoint: None, + secret_ref: None, }; - let count = lance_store.count().await.unwrap_or(0); - if count == 0 { - // Auto-migrate from existing Parquet. - let pq_store = match state.bucket_registry.get(&bucket) { + match bucket_registry.add_bucket(bc).await { + Ok(info) => { + tracing::info!( + "profile '{}' activated bucket '{}' (root={}, reachable={})", + profile_id_clone, bucket_name, root, info.reachable, + ); + } + Err(e) => failures.push(format!("auto-provision bucket '{}': {}", bucket_name, e)), + } + } + } + + let all_indexes = index_registry.list(None, None).await; + let use_lance = profile_backend == shared::types::VectorBackend::Lance; + + for binding in &profile_bound { + let matched: Vec<_> = all_indexes + .iter() + .filter(|m| &m.source == binding) + .collect(); + if matched.is_empty() { + failures.push(format!("no vector index found for binding '{}'", binding)); + continue; + } + for meta in matched { + if use_lance { + let bucket = meta.bucket.clone(); + let lance_store = match lance.store_for_new(&meta.index_name, &bucket).await { Ok(s) => s, - Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; } + Err(e) => { failures.push(format!("{}: lance store init: {e}", meta.index_name)); continue; } }; - match storaged::ops::get(&pq_store, &meta.storage_key).await { - Ok(bytes) => { - let build_t = std::time::Instant::now(); - match lance_store.migrate_from_parquet_bytes(&bytes).await { - Ok(ms) => { - total_vectors += ms.rows_written; - tracing::info!( - "lance auto-migrate '{}': {} rows in {:.2}s", - meta.index_name, ms.rows_written, ms.duration_secs, - ); - warmed.push(WarmedIndex { - index_name: meta.index_name.clone(), - source: meta.source.clone(), - vectors: ms.rows_written, - hnsw_build_secs: build_t.elapsed().as_secs_f32(), - }); + let count = lance_store.count().await.unwrap_or(0); + if count == 0 { + let pq_store = match bucket_registry.get(&bucket) { + Ok(s) => s, + Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; } + }; + match storaged::ops::get(&pq_store, &meta.storage_key).await { + Ok(bytes) => { + let build_t = std::time::Instant::now(); + match lance_store.migrate_from_parquet_bytes(&bytes).await { + Ok(ms) => { + total_vectors += ms.rows_written; + tracing::info!("lance auto-migrate '{}': {} rows in {:.2}s", meta.index_name, ms.rows_written, ms.duration_secs); + warmed.push(WarmedIndex { + index_name: meta.index_name.clone(), + source: meta.source.clone(), + vectors: ms.rows_written, + hnsw_build_secs: build_t.elapsed().as_secs_f32(), + }); + } + Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)), } - Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)), } + Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)), } - Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)), - } - } else { - total_vectors += count; - warmed.push(WarmedIndex { - index_name: meta.index_name.clone(), - source: meta.source.clone(), - vectors: count, - hnsw_build_secs: 0.0, - }); - } - // Ensure IVF_PQ vector index exists. - if !lance_store.has_vector_index().await.unwrap_or(false) { - match lance_store.build_index(316, 8, 48).await { - Ok(ix) => tracing::info!( - "lance auto-index '{}': IVF_PQ built in {:.1}s", - meta.index_name, ix.build_time_secs, - ), - Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)), - } - } - // Ensure scalar btree on doc_id for O(log N) random fetch. - if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) { - match lance_store.build_scalar_index("doc_id").await { - Ok(ix) => tracing::info!( - "lance auto-index '{}': doc_id btree built in {:.2}s", - meta.index_name, ix.build_time_secs, - ), - Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)), - } - } - } else { - // --- Parquet + HNSW activation path (existing) --- - let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await { - Ok(arc) => arc, - Err(e) => { - failures.push(format!("{}: load failed: {}", meta.index_name, e)); - continue; - } - }; - total_vectors += embeddings.len(); - - let profile_default = trial::HnswConfig { - ef_construction: profile.hnsw_config.ef_construction, - ef_search: profile.hnsw_config.ef_search, - seed: profile.hnsw_config.seed, - }; - let cfg = state - .promotion_registry - .config_or(&meta.index_name, profile_default) - .await; - let build_t = std::time::Instant::now(); - match state - .hnsw_store - .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) - .await - { - Ok(_) => { + } else { + total_vectors += count; warmed.push(WarmedIndex { index_name: meta.index_name.clone(), source: meta.source.clone(), - vectors: embeddings.len(), - hnsw_build_secs: build_t.elapsed().as_secs_f32(), + vectors: count, + hnsw_build_secs: 0.0, }); } - Err(e) => { - failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)); + if !lance_store.has_vector_index().await.unwrap_or(false) { + match lance_store.build_index(316, 8, 48).await { + Ok(ix) => tracing::info!("lance auto-index '{}': IVF_PQ built in {:.1}s", meta.index_name, ix.build_time_secs), + Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)), + } + } + if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) { + match lance_store.build_scalar_index("doc_id").await { + Ok(ix) => tracing::info!("lance auto-index '{}': doc_id btree built in {:.2}s", meta.index_name, ix.build_time_secs), + Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)), + } + } + } else { + let embeddings = match embedding_cache.get_or_load(&meta.index_name).await { + Ok(arc) => arc, + Err(e) => { failures.push(format!("{}: load failed: {}", meta.index_name, e)); continue; } + }; + total_vectors += embeddings.len(); + + let profile_default = trial::HnswConfig { + ef_construction: profile_hnsw.ef_construction, + ef_search: profile_hnsw.ef_search, + seed: profile_hnsw.seed, + }; + let cfg = promotion_registry + .config_or(&meta.index_name, profile_default) + .await; + let build_t = std::time::Instant::now(); + match hnsw_store + .build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg) + .await + { + Ok(_) => { + warmed.push(WarmedIndex { + index_name: meta.index_name.clone(), + source: meta.source.clone(), + vectors: embeddings.len(), + hnsw_build_secs: build_t.elapsed().as_secs_f32(), + }); + } + Err(e) => failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)), } } } } - } - // Preload the new profile's Ollama model proactively. Same-model - // re-activations are cheap (Ollama no-ops if already loaded). - let mut model_preloaded = false; - match state.ai_client.preload_model(&profile.ollama_name).await { - Ok(_) => { - model_preloaded = true; - tracing::info!("profile '{}' preloaded ollama model '{}'", - profile.id, profile.ollama_name); + let mut model_preloaded = false; + match ai_client.preload_model(&profile_name).await { + Ok(_) => { + model_preloaded = true; + tracing::info!("profile '{}' preloaded ollama model '{}'", profile_id_clone, profile_name); + } + Err(e) => failures.push(format!("preload ollama model '{}': {e}", profile_name)), } - Err(e) => failures.push(format!( - "preload ollama model '{}': {e}", profile.ollama_name, - )), - } - // Take the GPU slot. - { - let mut guard = state.active_profile.write().await; - *guard = Some(ActiveProfileSlot { - profile_id: profile.id.clone(), - ollama_name: profile.ollama_name.clone(), - activated_at: chrono::Utc::now(), - }); - } + { + let mut guard = active_profile.write().await; + *guard = Some(ActiveProfileSlot { + profile_id: profile_id_clone.clone(), + ollama_name: profile_name.clone(), + activated_at: chrono::Utc::now(), + }); + } - let report = ActivateReport { - profile_id: profile.id, - ollama_name: profile.ollama_name, - indexes_warmed: warmed, - failures, - total_vectors, - duration_secs: t0.elapsed().as_secs_f32(), - model_preloaded, - previous_profile: previous_slot.map(|s| s.profile_id), - }; + let result = serde_json::to_value(ActivateReport { + profile_id: profile_id_clone, + ollama_name: profile_name, + indexes_warmed: warmed, + failures, + total_vectors, + duration_secs: t0.elapsed().as_secs_f32(), + model_preloaded, + previous_profile: previous_slot.map(|s| s.profile_id), + }).ok(); - Ok(Json(report)) + tracker.complete(&job_id, result).await; + }); + + Ok(Json(json!({ + "job_id": job_id_for_response, + "message": format!("profile activation started — poll /vectors/jobs/{} for progress", job_id_for_response), + }))) } /// Unload this profile's model and clear the active slot. No-op if the diff --git a/crates/vectord/src/supervisor.rs b/crates/vectord/src/supervisor.rs index 12ca141..defb4a7 100644 --- a/crates/vectord/src/supervisor.rs +++ b/crates/vectord/src/supervisor.rs @@ -170,7 +170,7 @@ pub async fn run_supervised( // Update job tracker let elapsed = start_time.elapsed().as_secs_f32(); let rate = if elapsed > 0.0 { done as f32 / elapsed } else { 0.0 }; - tracker.update_progress(&jid, done, rate).await; + tracker.update_embed_progress(&jid, done, rate).await; // Update checkpoint let mut ckpt = ckpt.write().await; diff --git a/docs/PHASES.md b/docs/PHASES.md index 2d7c8a8..4e20b9d 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -324,6 +324,16 @@ - Mem0-style upsert: `/seed` with `append=true` (default) routes through `upsert_entry`, which decides ADD / UPDATE / NOOP on (operation, day, city, state). Same-day re-seed merges names (union, stable order) instead of duplicating the row. Identical re-seed is a no-op. Different-day same-operation is a fresh ADD. Playbook_id stays stable on UPDATE so prior citations remain valid. - Letta-style hot cache: `PlaybookMemory` now holds a `geo_index: HashMap<(city_lower, state_upper), Vec>` rebuilt on every mutation. Geo-filtered boost queries skip the full scan and hit the O(1) key lookup. At 1.9K entries the full scan was sub-ms; the index scales the same path to 100K+ without code changes. - `UpsertOutcome` enum reported back to callers — `{mode: added|updated|noop, playbook_id, merged_names?}` + `entries_after`. +- [x] **Phase 37: Hot-swap async** (2026-04-22) + - Extended `JobTracker` with `JobType::ProfileActivation` + `Embed` enum variants + - Made `activate_profile` return immediately with `job_id`, work runs in background via `tokio::spawn` + - Background jobs tracked via `POST /vectors/jobs/{id}` + `GET /vectors/jobs` +- [x] **Phase 38: Universal API Skeleton** (2026-04-23) + - `/v1/chat` — OpenAI-compatible POST, forwards to local Ollama or Ollama Cloud + - `/v1/usage` — returns `{requests, prompt_tokens, completion_tokens, total_tokens}` + - `/v1/sessions` — returns `{data: [], note: "Phase 38: stateless"}` + - Langfuse trace integration (fire-and-forget, Phase 40 early) + - 12 unit tests green, curl gates pass - [ ] Fine-tuned domain models (Phase 25+) - [ ] Multi-node query distribution (only if ceilings bite)