From 3b695cd592575e9eb756545d7a6b4a880b0fdffa Mon Sep 17 00:00:00 2001 From: root Date: Fri, 27 Mar 2026 09:06:28 -0500 Subject: [PATCH] Dual-pipeline supervisor for embedding ingestion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 4 parallel pipelines (tuned for i9 + A4000) - Range-based work splitting (2500 chunks per range) - Round-robin retry on failure (3 attempts before dead-letter) - Checkpointing to disk every 1000 chunks (crash recovery) - On restart, loads checkpoint and skips completed ranges - Dead-letter queue for permanently failed ranges - Vectors assembled in order after all pipelines finish - Batch size 64 for GPU throughput Architecture: Supervisor → splits 100K chunks into 40 ranges ├── Pipeline 0: grabs range, embeds, reports progress ├── Pipeline 1: grabs range, embeds, reports progress ├── Pipeline 2: grabs range, embeds, reports progress └── Pipeline 3: grabs range, embeds, reports progress Failed range → back to queue → next available pipeline retries Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vectord/src/lib.rs | 1 + crates/vectord/src/service.rs | 9 +- crates/vectord/src/supervisor.rs | 286 +++++++++++++++++++++++++++++++ 3 files changed, 293 insertions(+), 3 deletions(-) create mode 100644 crates/vectord/src/supervisor.rs diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index c2317e1..4c138f3 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -3,4 +3,5 @@ pub mod jobs; pub mod store; pub mod search; pub mod rag; +pub mod supervisor; pub mod service; diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index a1df5a4..f193202 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest}; -use crate::{chunker, jobs, rag, search, store}; +use crate::{chunker, jobs, rag, search, store, supervisor}; #[derive(Clone)] pub struct VectorState { @@ -84,14 +84,17 @@ async fn create_index( let job_id = state.job_tracker.create(&index_name, n_chunks).await; tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks); - // Spawn background embedding task + // Spawn supervised dual-pipeline embedding let tracker = state.job_tracker.clone(); let ai_client = state.ai_client.clone(); let obj_store = state.store.clone(); let jid = job_id.clone(); tokio::spawn(async move { - let result = run_embedding_job(&jid, &index_name, &chunks, &ai_client, &obj_store, &tracker).await; + let config = supervisor::SupervisorConfig::default(); + let result = supervisor::run_supervised( + &jid, &index_name, chunks, &ai_client, &obj_store, &tracker, config, + ).await; match result { Ok(key) => { tracker.complete(&jid, key).await; diff --git a/crates/vectord/src/supervisor.rs b/crates/vectord/src/supervisor.rs new file mode 100644 index 0000000..12ca141 --- /dev/null +++ b/crates/vectord/src/supervisor.rs @@ -0,0 +1,286 @@ +/// Dual-pipeline supervisor for embedding ingestion. +/// Splits work into ranges, runs parallel pipelines, handles failures +/// with round-robin retry and checkpointing. + +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use aibridge::client::{AiClient, EmbedRequest}; +use crate::chunker::TextChunk; +use crate::jobs::JobTracker; +use crate::store; +use object_store::ObjectStore; + +/// A range of chunks to embed. +#[derive(Debug, Clone)] +struct ChunkRange { + start: usize, + end: usize, + attempts: u32, +} + +/// Checkpoint: tracks which ranges are done, persisted for crash recovery. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Checkpoint { + pub job_id: String, + pub index_name: String, + pub total_chunks: usize, + pub completed_ranges: Vec<(usize, usize)>, // (start, end) pairs + pub failed_ranges: Vec<(usize, usize, String)>, // (start, end, error) + pub embedded_count: usize, +} + +/// Pipeline status for monitoring. +#[derive(Debug, Clone, serde::Serialize)] +pub struct PipelineStatus { + pub pipeline_id: String, + pub current_range: Option<(usize, usize)>, + pub chunks_done: usize, + pub rate: f32, + pub alive: bool, +} + +/// Supervisor configuration. +pub struct SupervisorConfig { + pub num_pipelines: usize, // default 2 + pub batch_size: usize, // embeddings per API call (default 32) + pub range_size: usize, // chunks per range (default 2000) + pub max_retries: u32, // per range (default 3) + pub checkpoint_interval: usize, // checkpoint every N chunks (default 1000) +} + +impl Default for SupervisorConfig { + fn default() -> Self { + Self { + num_pipelines: 4, // i9 + A4000 can handle parallel embedding + batch_size: 64, // larger batches for GPU throughput + range_size: 2500, // bigger ranges, fewer coordination overhead + max_retries: 3, + checkpoint_interval: 1000, + } + } +} + +/// Run supervised dual-pipeline embedding. +pub async fn run_supervised( + job_id: &str, + index_name: &str, + chunks: Vec, + ai_client: &AiClient, + obj_store: &Arc, + tracker: &JobTracker, + config: SupervisorConfig, +) -> Result { + let total = chunks.len(); + let chunks = Arc::new(chunks); + + tracing::info!("supervisor: starting {} pipelines for {} chunks (range_size={})", + config.num_pipelines, total, config.range_size); + + // Split into ranges + let mut ranges: Vec = Vec::new(); + let mut start = 0; + while start < total { + let end = (start + config.range_size).min(total); + ranges.push(ChunkRange { start, end, attempts: 0 }); + start = end; + } + + tracing::info!("supervisor: {} ranges created", ranges.len()); + + // Shared state + let work_queue: Arc>> = Arc::new(RwLock::new(ranges)); + let all_vectors: Arc>>>> = Arc::new(RwLock::new(HashMap::new())); + let dead_letter: Arc>> = Arc::new(RwLock::new(Vec::new())); + let global_count: Arc = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let start_time = std::time::Instant::now(); + + // Checkpoint state + let checkpoint: Arc> = Arc::new(RwLock::new(Checkpoint { + job_id: job_id.to_string(), + index_name: index_name.to_string(), + total_chunks: total, + completed_ranges: vec![], + failed_ranges: vec![], + embedded_count: 0, + })); + + // Try to load existing checkpoint (crash recovery) + if let Ok(data) = storaged::ops::get(obj_store, &format!("checkpoints/{job_id}.json")).await { + if let Ok(saved) = serde_json::from_slice::(&data) { + tracing::info!("supervisor: recovered checkpoint — {}/{} already done", + saved.embedded_count, saved.total_chunks); + + // Remove already-completed ranges from work queue + let mut queue = work_queue.write().await; + queue.retain(|r| !saved.completed_ranges.iter().any(|(s, e)| *s == r.start && *e == r.end)); + global_count.store(saved.embedded_count, std::sync::atomic::Ordering::Relaxed); + + *checkpoint.write().await = saved; + } + } + + // Spawn pipeline workers + let mut handles = Vec::new(); + for pipeline_id in 0..config.num_pipelines { + let queue = work_queue.clone(); + let vectors = all_vectors.clone(); + let dead = dead_letter.clone(); + let count = global_count.clone(); + let ckpt = checkpoint.clone(); + let client = ai_client.clone(); + let store = obj_store.clone(); + let chunks = chunks.clone(); + let tracker = tracker.clone(); + let jid = job_id.to_string(); + let batch_size = config.batch_size; + let max_retries = config.max_retries; + let ckpt_interval = config.checkpoint_interval; + + let handle = tokio::spawn(async move { + let pid = format!("pipeline-{pipeline_id}"); + tracing::info!("{pid}: started"); + + loop { + // Grab next range from queue + let range = { + let mut q = queue.write().await; + q.pop() + }; + + let range = match range { + Some(r) => r, + None => { + tracing::info!("{pid}: no more work, shutting down"); + break; + } + }; + + tracing::debug!("{pid}: processing range [{}, {})", range.start, range.end); + + // Embed the range + match embed_range(&chunks[range.start..range.end], &client, batch_size).await { + Ok(range_vectors) => { + let n = range_vectors.len(); + vectors.write().await.insert(range.start, range_vectors); + let prev = count.fetch_add(n, std::sync::atomic::Ordering::Relaxed); + let done = prev + n; + + // Update job tracker + let elapsed = start_time.elapsed().as_secs_f32(); + let rate = if elapsed > 0.0 { done as f32 / elapsed } else { 0.0 }; + tracker.update_progress(&jid, done, rate).await; + + // Update checkpoint + let mut ckpt = ckpt.write().await; + ckpt.completed_ranges.push((range.start, range.end)); + ckpt.embedded_count = done; + + // Persist checkpoint periodically + if done % ckpt_interval < n { + let json = serde_json::to_vec(&*ckpt).unwrap_or_default(); + let _ = storaged::ops::put(&store, &format!("checkpoints/{jid}.json"), json.into()).await; + } + + tracing::info!("{pid}: range [{},{}) done — {done}/{} total", + range.start, range.end, chunks.len()); + } + Err(e) => { + let attempt = range.attempts + 1; + tracing::warn!("{pid}: range [{},{}) failed (attempt {attempt}/{max_retries}): {e}", + range.start, range.end); + + if attempt < max_retries { + // Push back to queue for retry (round-robin to other pipeline) + queue.write().await.push(ChunkRange { + start: range.start, + end: range.end, + attempts: attempt, + }); + } else { + // Dead letter + tracing::error!("{pid}: range [{},{}) dead-lettered after {max_retries} attempts", + range.start, range.end); + dead.write().await.push((range.start, range.end, e)); + + let mut ckpt = ckpt.write().await; + ckpt.failed_ranges.push((range.start, range.end, format!("max retries exceeded"))); + } + } + } + } + }); + handles.push(handle); + } + + // Wait for all pipelines + for handle in handles { + let _ = handle.await; + } + + // Check for dead letters + let dead = dead_letter.read().await; + if !dead.is_empty() { + tracing::warn!("supervisor: {} ranges failed permanently", dead.len()); + } + + // Assemble vectors in order + let vectors_map = all_vectors.read().await; + let mut sorted_starts: Vec = vectors_map.keys().cloned().collect(); + sorted_starts.sort(); + + let mut final_vectors: Vec> = Vec::with_capacity(total); + for start in sorted_starts { + final_vectors.extend(vectors_map[&start].clone()); + } + + let embedded_count = final_vectors.len(); + tracing::info!("supervisor: {embedded_count}/{total} chunks embedded, storing index"); + + // Store — only the successfully embedded chunks + // We need to match chunks to vectors + let successful_chunks: Vec = { + let ckpt = checkpoint.read().await; + let mut result = Vec::new(); + for (s, e) in &ckpt.completed_ranges { + result.extend(chunks[*s..*e].iter().cloned()); + } + result + }; + + if successful_chunks.is_empty() { + return Err("no chunks were successfully embedded".into()); + } + + let key = store::store_embeddings(obj_store, index_name, &successful_chunks, &final_vectors).await?; + + // Clean up checkpoint + let _ = storaged::ops::delete(obj_store, &format!("checkpoints/{job_id}.json")).await; + + let elapsed = start_time.elapsed().as_secs_f32(); + let rate = embedded_count as f32 / elapsed; + tracing::info!("supervisor: completed — {embedded_count} vectors in {elapsed:.0}s ({rate:.0}/sec)"); + + Ok(key) +} + +/// Embed a range of chunks. Returns vectors or error. +async fn embed_range( + chunks: &[TextChunk], + ai_client: &AiClient, + batch_size: usize, +) -> Result>, String> { + let mut vectors = Vec::with_capacity(chunks.len()); + + for batch in chunks.chunks(batch_size) { + let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); + let resp = ai_client.embed(EmbedRequest { + texts, + model: None, + }).await.map_err(|e| format!("embed error: {e}"))?; + vectors.extend(resp.embeddings); + } + + Ok(vectors) +}