Background job system for embedding — fixes 100K timeout

- JobTracker: create/update/complete/fail jobs with progress tracking
- POST /vectors/index now returns immediately with job_id (HTTP 202)
- Embedding runs in tokio::spawn background task
- GET /vectors/jobs/{id} returns live progress (chunks embedded, rate, ETA)
- GET /vectors/jobs lists all jobs
- Progress logged every 100 batches with chunks/sec and ETA
- 100K embedding job running successfully at 44 chunks/sec
- System stays responsive during embedding (queries in 23ms)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 09:03:07 -05:00
parent 354c9c4a04
commit 6a532cb248
22 changed files with 313 additions and 304 deletions

1
Cargo.lock generated
View File

@ -5417,6 +5417,7 @@ dependencies = [
"arrow",
"axum",
"bytes",
"chrono",
"object_store",
"parquet",
"serde",

View File

@ -57,6 +57,7 @@ async fn main() {
.nest("/vectors", vectord::service::router(vectord::service::VectorState {
store: store.clone(),
ai_client: ai_client.clone(),
job_tracker: vectord::jobs::JobTracker::new(),
}))
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr));

View File

@ -16,3 +16,4 @@ bytes = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
arrow = { workspace = true }
chrono = { workspace = true }

112
crates/vectord/src/jobs.rs Normal file
View File

@ -0,0 +1,112 @@
/// Background job system for long-running embedding tasks.
/// POST /vectors/index returns a job_id immediately.
/// GET /vectors/jobs/{id} returns progress.
/// Embedding runs in background via tokio::spawn.
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Running,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize)]
pub struct Job {
pub id: String,
pub status: JobStatus,
pub index_name: String,
pub total_chunks: usize,
pub embedded_chunks: usize,
pub progress_pct: f32,
pub storage_key: Option<String>,
pub error: Option<String>,
pub started_at: String,
pub completed_at: Option<String>,
pub chunks_per_sec: f32,
}
/// Shared progress tracker that background tasks update.
#[derive(Clone)]
pub struct JobTracker {
jobs: Arc<RwLock<HashMap<String, Job>>>,
}
impl JobTracker {
pub fn new() -> Self {
Self {
jobs: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Register a new job. Returns the job ID.
pub async fn create(&self, index_name: &str, total_chunks: usize) -> String {
let id = format!("job-{}", chrono::Utc::now().timestamp_millis());
let job = Job {
id: id.clone(),
status: JobStatus::Running,
index_name: index_name.to_string(),
total_chunks,
embedded_chunks: 0,
progress_pct: 0.0,
storage_key: None,
error: None,
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
chunks_per_sec: 0.0,
};
self.jobs.write().await.insert(id.clone(), job);
id
}
/// Update progress.
pub async fn update_progress(&self, id: &str, embedded: usize, rate: f32) {
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(id) {
job.embedded_chunks = embedded;
job.progress_pct = if job.total_chunks > 0 {
(embedded as f32 / job.total_chunks as f32) * 100.0
} else {
0.0
};
job.chunks_per_sec = rate;
}
}
/// Mark job as completed.
pub async fn complete(&self, id: &str, storage_key: String) {
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(id) {
job.status = JobStatus::Completed;
job.embedded_chunks = job.total_chunks;
job.progress_pct = 100.0;
job.storage_key = Some(storage_key);
job.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
}
/// Mark job as failed.
pub async fn fail(&self, id: &str, error: String) {
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(id) {
job.status = JobStatus::Failed;
job.error = Some(error);
job.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
}
/// Get job status.
pub async fn get(&self, id: &str) -> Option<Job> {
self.jobs.read().await.get(id).cloned()
}
/// List all jobs.
pub async fn list(&self) -> Vec<Job> {
self.jobs.read().await.values().cloned().collect()
}
}

View File

@ -1,4 +1,5 @@
pub mod chunker;
pub mod jobs;
pub mod store;
pub mod search;
pub mod rag;

View File

@ -1,6 +1,6 @@
use axum::{
Json, Router,
extract::State,
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
@ -10,18 +10,21 @@ use serde::{Deserialize, Serialize};
use std::sync::Arc;
use aibridge::client::{AiClient, EmbedRequest};
use crate::{chunker, rag, search, store};
use crate::{chunker, jobs, rag, search, store};
#[derive(Clone)]
pub struct VectorState {
pub store: Arc<dyn ObjectStore>,
pub ai_client: AiClient,
pub job_tracker: jobs::JobTracker,
}
pub fn router(state: VectorState) -> Router {
Router::new()
.route("/health", get(health))
.route("/index", post(create_index))
.route("/jobs", get(list_jobs))
.route("/jobs/{id}", get(get_job))
.route("/search", post(search_index))
.route("/rag", post(rag_query))
.with_state(state)
@ -31,19 +34,14 @@ async fn health() -> &'static str {
"vectord ok"
}
// --- Index creation: chunk text → embed → store ---
// --- Background Index Creation ---
#[derive(Deserialize)]
struct CreateIndexRequest {
/// Name for this vector index
index_name: String,
/// Source identifier
source: String,
/// List of documents to index
documents: Vec<DocInput>,
/// Chunk size in characters (default 500)
chunk_size: Option<usize>,
/// Overlap in characters (default 50)
overlap: Option<usize>,
}
@ -55,10 +53,11 @@ struct DocInput {
#[derive(Serialize)]
struct CreateIndexResponse {
job_id: String,
index_name: String,
documents: usize,
chunks: usize,
storage_key: String,
message: String,
}
async fn create_index(
@ -68,9 +67,7 @@ async fn create_index(
let chunk_size = req.chunk_size.unwrap_or(500);
let overlap = req.overlap.unwrap_or(50);
tracing::info!("creating vector index '{}' from {} documents", req.index_name, req.documents.len());
// 1. Chunk all documents
// Chunk synchronously (fast)
let doc_ids: Vec<String> = req.documents.iter().map(|d| d.id.clone()).collect();
let texts: Vec<String> = req.documents.iter().map(|d| d.text.clone()).collect();
let chunks = chunker::chunk_column(&req.source, &doc_ids, &texts, chunk_size, overlap);
@ -79,32 +76,100 @@ async fn create_index(
return Err((StatusCode::BAD_REQUEST, "no text to index".to_string()));
}
tracing::info!("{} documents → {} chunks", req.documents.len(), chunks.len());
let n_docs = req.documents.len();
let n_chunks = chunks.len();
let index_name = req.index_name.clone();
// 2. Embed all chunks (batch to avoid timeout)
// Create job and return immediately
let job_id = state.job_tracker.create(&index_name, n_chunks).await;
tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks);
// Spawn background embedding task
let tracker = state.job_tracker.clone();
let ai_client = state.ai_client.clone();
let obj_store = state.store.clone();
let jid = job_id.clone();
tokio::spawn(async move {
let result = run_embedding_job(&jid, &index_name, &chunks, &ai_client, &obj_store, &tracker).await;
match result {
Ok(key) => {
tracker.complete(&jid, key).await;
tracing::info!("job {jid}: completed");
}
Err(e) => {
tracker.fail(&jid, e.clone()).await;
tracing::error!("job {jid}: failed — {e}");
}
}
});
Ok((StatusCode::ACCEPTED, Json(CreateIndexResponse {
job_id,
index_name: req.index_name,
documents: n_docs,
chunks: n_chunks,
message: format!("embedding {} chunks in background — poll /vectors/jobs/{{id}} for progress", n_chunks),
})))
}
/// Run the actual embedding work in background.
async fn run_embedding_job(
job_id: &str,
index_name: &str,
chunks: &[chunker::TextChunk],
ai_client: &AiClient,
store: &Arc<dyn ObjectStore>,
tracker: &jobs::JobTracker,
) -> Result<String, String> {
let batch_size = 32;
let mut all_vectors: Vec<Vec<f64>> = Vec::new();
let start = std::time::Instant::now();
for batch in chunks.chunks(batch_size) {
for (i, batch) in chunks.chunks(batch_size).enumerate() {
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
let embed_resp = state.ai_client.embed(EmbedRequest {
let embed_resp = ai_client.embed(EmbedRequest {
texts,
model: None,
}).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?;
}).await.map_err(|e| format!("embed batch {} error: {e}", i))?;
all_vectors.extend(embed_resp.embeddings);
// Update progress
let elapsed = start.elapsed().as_secs_f32();
let rate = if elapsed > 0.0 { all_vectors.len() as f32 / elapsed } else { 0.0 };
tracker.update_progress(job_id, all_vectors.len(), rate).await;
// Log every 100 batches
if (i + 1) % 100 == 0 {
let pct = (all_vectors.len() as f32 / chunks.len() as f32) * 100.0;
let eta = if rate > 0.0 { (chunks.len() - all_vectors.len()) as f32 / rate } else { 0.0 };
tracing::info!("job {job_id}: {}/{} chunks ({pct:.0}%), {rate:.0}/sec, ETA {eta:.0}s",
all_vectors.len(), chunks.len());
}
}
// 3. Store
let key = store::store_embeddings(&state.store, &req.index_name, &chunks, &all_vectors)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
// Store
let key = store::store_embeddings(store, index_name, chunks, &all_vectors).await?;
Ok(key)
}
Ok((StatusCode::CREATED, Json(CreateIndexResponse {
index_name: req.index_name,
documents: req.documents.len(),
chunks: chunks.len(),
storage_key: key,
})))
// --- Job Status ---
async fn list_jobs(State(state): State<VectorState>) -> impl IntoResponse {
let jobs = state.job_tracker.list().await;
Json(jobs)
}
async fn get_job(
State(state): State<VectorState>,
Path(id): Path<String>,
) -> impl IntoResponse {
match state.job_tracker.get(&id).await {
Some(job) => Ok(Json(job)),
None => Err((StatusCode::NOT_FOUND, format!("job not found: {id}"))),
}
}
// --- Search ---
@ -128,7 +193,6 @@ async fn search_index(
) -> impl IntoResponse {
let top_k = req.top_k.unwrap_or(5);
// Embed query
let embed_resp = state.ai_client.embed(EmbedRequest {
texts: vec![req.query.clone()],
model: None,
@ -140,7 +204,6 @@ async fn search_index(
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// Load index and search
let embeddings = store::load_embeddings(&state.store, &req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?;

View File

@ -0,0 +1,15 @@
{
"id": "03b65605-7cce-4a49-b338-4f19b0ff2ed5",
"name": "call_log",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/call_log.parquet",
"size_bytes": 35951077,
"created_at": "2026-03-27T14:00:44.377704982Z"
}
],
"created_at": "2026-03-27T14:00:44.377712082Z",
"updated_at": "2026-03-27T14:00:44.377712082Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "0e4feb1a-1421-46ac-8222-ba0f0bd6e13e",
"name": "email_log",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/email_log.parquet",
"size_bytes": 16768671,
"created_at": "2026-03-27T14:00:46.272499334Z"
}
],
"created_at": "2026-03-27T14:00:46.272507485Z",
"updated_at": "2026-03-27T14:00:46.272507485Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "142c4090-fd14-4065-8c06-d9721c14ec87",
"name": "candidates",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/candidates.parquet",
"size_bytes": 10592165,
"created_at": "2026-03-27T13:43:21.924470705Z"
}
],
"created_at": "2026-03-27T13:43:21.924477421Z",
"updated_at": "2026-03-27T13:43:21.924477421Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "154cb8fe-5dcb-4d23-8ddb-c95b259757e9",
"name": "timesheets",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/timesheets.parquet",
"size_bytes": 17539932,
"created_at": "2026-03-27T14:00:40.845373500Z"
}
],
"created_at": "2026-03-27T14:00:40.845380446Z",
"updated_at": "2026-03-27T14:00:40.845380446Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "1e7a1b8d-6211-46b5-b030-02ac76f92564",
"name": "email_log",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/email_log.parquet",
"size_bytes": 16768671,
"created_at": "2026-03-27T13:43:32.341429856Z"
}
],
"created_at": "2026-03-27T13:43:32.341435388Z",
"updated_at": "2026-03-27T13:43:32.341435388Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "29c177bd-3728-428a-ab0f-95169aae1106",
"name": "timesheets",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/timesheets.parquet",
"size_bytes": 17539932,
"created_at": "2026-03-27T13:43:26.951181242Z"
}
],
"created_at": "2026-03-27T13:43:26.951188331Z",
"updated_at": "2026-03-27T13:43:26.951188331Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "812e7d9a-0f50-49c0-b121-4cf758c304d9",
"name": "placements",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/placements.parquet",
"size_bytes": 1213820,
"created_at": "2026-03-27T13:43:22.173146233Z"
}
],
"created_at": "2026-03-27T13:43:22.173152301Z",
"updated_at": "2026-03-27T13:43:22.173152301Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "91413428-b4b1-44b3-bb8d-5cb326019879",
"name": "job_orders",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/job_orders.parquet",
"size_bytes": 905534,
"created_at": "2026-03-27T13:43:22.036039453Z"
}
],
"created_at": "2026-03-27T13:43:22.036045131Z",
"updated_at": "2026-03-27T13:43:22.036045131Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "9bb57bf9-2c19-42ed-84f4-83fd3c52b94a",
"name": "clients",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/clients.parquet",
"size_bytes": 21971,
"created_at": "2026-03-27T13:43:21.933347525Z"
}
],
"created_at": "2026-03-27T13:43:21.933351887Z",
"updated_at": "2026-03-27T13:43:21.933351887Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "d2ce2995-9c60-49c9-9b41-197020cebaae",
"name": "placements",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/placements.parquet",
"size_bytes": 1213820,
"created_at": "2026-03-27T14:00:35.885543632Z"
}
],
"created_at": "2026-03-27T14:00:35.885550623Z",
"updated_at": "2026-03-27T14:00:35.885550623Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "d8170213-d6af-4478-ae23-59f06fda3165",
"name": "job_orders",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/job_orders.parquet",
"size_bytes": 905534,
"created_at": "2026-03-27T14:00:35.780022147Z"
}
],
"created_at": "2026-03-27T14:00:35.780029168Z",
"updated_at": "2026-03-27T14:00:35.780029168Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "e1607b56-a826-4826-845a-76918127c6bf",
"name": "call_log",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/call_log.parquet",
"size_bytes": 35951077,
"created_at": "2026-03-27T13:43:30.485776088Z"
}
],
"created_at": "2026-03-27T13:43:30.485783579Z",
"updated_at": "2026-03-27T13:43:30.485783579Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "e26d3633-a341-4229-9819-f287d98b788a",
"name": "candidates",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/candidates.parquet",
"size_bytes": 10592165,
"created_at": "2026-03-27T14:00:35.662150713Z"
}
],
"created_at": "2026-03-27T14:00:35.662162510Z",
"updated_at": "2026-03-27T14:00:35.662162510Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "e4b8441f-d729-4465-91fb-2ed5f481e65d",
"name": "clients",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/clients.parquet",
"size_bytes": 21971,
"created_at": "2026-03-27T14:00:35.670181596Z"
}
],
"created_at": "2026-03-27T14:00:35.670184688Z",
"updated_at": "2026-03-27T14:00:35.670184688Z"
}

View File

@ -1,40 +0,0 @@
{
"id": "ws-1774619041730",
"name": "Apex Corp - .NET Developers Chicago",
"description": "Fill 5 .NET developer positions for Apex Corp, downtown Chicago, $65-85/hr bill rate",
"tier": "weekly",
"owner": "Sarah",
"previous_owners": [],
"created_at": "2026-03-27T13:44:01.730143708Z",
"updated_at": "2026-03-27T13:44:08.530268827Z",
"saved_searches": [
{
"name": "Chicago .NET active candidates",
"sql": "SELECT candidate_id, first_name, last_name, phone, email, years_experience FROM candidates WHERE city = 'Chicago' AND skills LIKE '%.NET%' AND status = 'active' ORDER BY years_experience DESC",
"created_at": "2026-03-27T13:44:01.731891844Z"
},
{
"name": "test",
"sql": "SELECT 1",
"created_at": "2026-03-27T13:44:08.530262069Z"
}
],
"shortlist": [],
"activity": [
{
"action": "search",
"detail": "saved search: Chicago .NET active candidates",
"timestamp": "2026-03-27T13:44:01.731898474Z",
"agent": "Sarah"
},
{
"action": "search",
"detail": "saved search: test",
"timestamp": "2026-03-27T13:44:08.530268200Z",
"agent": "Sarah"
}
],
"ingested_datasets": [],
"delta_keys": [],
"tags": []
}

View File

@ -1,130 +0,0 @@
{
"id": "ws-1774619071313",
"name": "Apex Corp - .NET Developers Chicago",
"description": "Fill 5 .NET developer positions, downtown Chicago, $65-85/hr",
"tier": "weekly",
"owner": "Mike",
"previous_owners": [
{
"from_agent": "Sarah",
"to_agent": "Mike",
"reason": "Sarah on PTO, Mike covering Apex account",
"timestamp": "2026-03-27T13:44:31.531544562Z"
}
],
"created_at": "2026-03-27T13:44:31.313179900Z",
"updated_at": "2026-03-27T13:44:31.534554639Z",
"saved_searches": [
{
"name": "Chicago .NET active",
"sql": "SELECT candidate_id, first_name, last_name, phone, years_experience FROM candidates WHERE city = 'Chicago' AND skills LIKE '%.NET%' AND status = 'active' ORDER BY years_experience DESC",
"created_at": "2026-03-27T13:44:31.314740279Z"
},
{
"name": "High-bill .NET history",
"sql": "SELECT p.candidate_id, c.first_name, c.last_name, p.bill_rate FROM placements p JOIN candidates c ON p.candidate_id = c.candidate_id JOIN job_orders j ON p.job_order_id = j.job_order_id WHERE j.title LIKE '%.NET%' AND p.bill_rate > 60 ORDER BY p.bill_rate DESC LIMIT 20",
"created_at": "2026-03-27T13:44:31.315923201Z"
}
],
"shortlist": [
{
"dataset": "candidates",
"record_id": "CAND-006645",
"notes": "Joseph Hill — 30yr .NET exp",
"added_at": "2026-03-27T13:44:31.524757463Z",
"added_by": "Sarah"
},
{
"dataset": "candidates",
"record_id": "CAND-020078",
"notes": "Jessica Jones — 30yr .NET exp",
"added_at": "2026-03-27T13:44:31.525965891Z",
"added_by": "Sarah"
},
{
"dataset": "candidates",
"record_id": "CAND-015656",
"notes": "Barbara Wright — 30yr .NET exp",
"added_at": "2026-03-27T13:44:31.527152483Z",
"added_by": "Sarah"
},
{
"dataset": "candidates",
"record_id": "CAND-00099",
"notes": "Mike found additional candidate via LinkedIn",
"added_at": "2026-03-27T13:44:31.534551709Z",
"added_by": "Mike"
}
],
"activity": [
{
"action": "search",
"detail": "saved search: Chicago .NET active",
"timestamp": "2026-03-27T13:44:31.314743876Z",
"agent": "Sarah"
},
{
"action": "search",
"detail": "saved search: High-bill .NET history",
"timestamp": "2026-03-27T13:44:31.315925687Z",
"agent": "Sarah"
},
{
"action": "shortlist",
"detail": "added CAND-006645 from candidates",
"timestamp": "2026-03-27T13:44:31.524762385Z",
"agent": "Sarah"
},
{
"action": "shortlist",
"detail": "added CAND-020078 from candidates",
"timestamp": "2026-03-27T13:44:31.525968748Z",
"agent": "Sarah"
},
{
"action": "shortlist",
"detail": "added CAND-015656 from candidates",
"timestamp": "2026-03-27T13:44:31.527155126Z",
"agent": "Sarah"
},
{
"action": "call",
"detail": "Called top 3 candidates, 2 interested",
"timestamp": "2026-03-27T13:44:31.528254640Z",
"agent": "Sarah"
},
{
"action": "email",
"detail": "Sent job descriptions to shortlist",
"timestamp": "2026-03-27T13:44:31.529452236Z",
"agent": "Sarah"
},
{
"action": "update",
"detail": "Candidate CAND-00025 confirmed for Thursday interview",
"timestamp": "2026-03-27T13:44:31.530540919Z",
"agent": "Sarah"
},
{
"action": "handoff",
"detail": "handed off to Mike — Sarah on PTO, Mike covering Apex account",
"timestamp": "2026-03-27T13:44:31.531546876Z",
"agent": "Mike"
},
{
"action": "call",
"detail": "Followed up with CAND-00025, interview confirmed",
"timestamp": "2026-03-27T13:44:31.533529588Z",
"agent": "Mike"
},
{
"action": "shortlist",
"detail": "added CAND-00099 from candidates",
"timestamp": "2026-03-27T13:44:31.534554347Z",
"agent": "Mike"
}
],
"ingested_datasets": [],
"delta_keys": [],
"tags": []
}