Phase 37: Hot-swap async + Phase 38: Universal API skeleton
- JobTracker extended with JobType::ProfileActivation + Embed - activate_profile returns job_id immediately, work spawns in background - /v1/chat, /v1/usage, /v1/sessions endpoints (OpenAI-compatible) - Langfuse trace integration (Phase 40 early deliverable) - 12 gateway unit tests green, curl gates pass
This commit is contained in:
parent
79108e30ac
commit
21e8015b60
@ -8,6 +8,13 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum JobType {
|
||||||
|
Embed,
|
||||||
|
ProfileActivation,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum JobStatus {
|
pub enum JobStatus {
|
||||||
@ -19,24 +26,61 @@ pub enum JobStatus {
|
|||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct Job {
|
pub struct Job {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub job_type: Option<JobType>,
|
||||||
pub status: JobStatus,
|
pub status: JobStatus,
|
||||||
pub index_name: String,
|
pub index_name: Option<String>,
|
||||||
|
pub profile_id: Option<String>,
|
||||||
pub total_chunks: usize,
|
pub total_chunks: usize,
|
||||||
/// How many chunks have been embedded so far.
|
|
||||||
/// Also serialized as "processed" for backward-compat with monitoring scripts.
|
|
||||||
pub embedded_chunks: usize,
|
|
||||||
#[serde(rename = "processed")]
|
#[serde(rename = "processed")]
|
||||||
pub processed_alias: usize,
|
pub processed_alias: usize,
|
||||||
pub total: usize,
|
|
||||||
pub progress_pct: f32,
|
pub progress_pct: f32,
|
||||||
pub storage_key: Option<String>,
|
pub storage_key: Option<String>,
|
||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
pub started_at: String,
|
pub started_at: String,
|
||||||
pub completed_at: Option<String>,
|
pub completed_at: Option<String>,
|
||||||
pub chunks_per_sec: f32,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub result: Option<serde_json::Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job {
|
||||||
|
pub fn new_embed(index_name: &str, total_chunks: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
id: format!("job-{}", chrono::Utc::now().timestamp_millis()),
|
||||||
|
job_type: Some(JobType::Embed),
|
||||||
|
status: JobStatus::Running,
|
||||||
|
index_name: Some(index_name.to_string()),
|
||||||
|
profile_id: None,
|
||||||
|
total_chunks,
|
||||||
|
processed_alias: 0,
|
||||||
|
progress_pct: 0.0,
|
||||||
|
storage_key: None,
|
||||||
|
error: None,
|
||||||
|
started_at: chrono::Utc::now().to_rfc3339(),
|
||||||
|
completed_at: None,
|
||||||
|
result: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_profile_activation(profile_id: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
id: format!("job-{}", chrono::Utc::now().timestamp_millis()),
|
||||||
|
job_type: Some(JobType::ProfileActivation),
|
||||||
|
status: JobStatus::Running,
|
||||||
|
index_name: None,
|
||||||
|
profile_id: Some(profile_id.to_string()),
|
||||||
|
total_chunks: 0,
|
||||||
|
processed_alias: 0,
|
||||||
|
progress_pct: 0.0,
|
||||||
|
storage_key: None,
|
||||||
|
error: None,
|
||||||
|
started_at: chrono::Utc::now().to_rfc3339(),
|
||||||
|
completed_at: None,
|
||||||
|
result: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared progress tracker that background tasks update.
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct JobTracker {
|
pub struct JobTracker {
|
||||||
jobs: Arc<RwLock<HashMap<String, Job>>>,
|
jobs: Arc<RwLock<HashMap<String, Job>>>,
|
||||||
@ -49,57 +93,42 @@ impl JobTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new job. Returns the job ID.
|
pub async fn create_embed(&self, index_name: &str, total_chunks: usize) -> String {
|
||||||
pub async fn create(&self, index_name: &str, total_chunks: usize) -> String {
|
let job = Job::new_embed(index_name, total_chunks);
|
||||||
let id = format!("job-{}", chrono::Utc::now().timestamp_millis());
|
let id = job.id.clone();
|
||||||
let job = Job {
|
|
||||||
id: id.clone(),
|
|
||||||
status: JobStatus::Running,
|
|
||||||
index_name: index_name.to_string(),
|
|
||||||
total_chunks,
|
|
||||||
embedded_chunks: 0,
|
|
||||||
processed_alias: 0,
|
|
||||||
total: total_chunks,
|
|
||||||
progress_pct: 0.0,
|
|
||||||
storage_key: None,
|
|
||||||
error: None,
|
|
||||||
started_at: chrono::Utc::now().to_rfc3339(),
|
|
||||||
completed_at: None,
|
|
||||||
chunks_per_sec: 0.0,
|
|
||||||
};
|
|
||||||
self.jobs.write().await.insert(id.clone(), job);
|
self.jobs.write().await.insert(id.clone(), job);
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update progress.
|
pub async fn create_profile_activation(&self, profile_id: &str) -> String {
|
||||||
pub async fn update_progress(&self, id: &str, embedded: usize, rate: f32) {
|
let job = Job::new_profile_activation(profile_id);
|
||||||
|
let id = job.id.clone();
|
||||||
|
self.jobs.write().await.insert(id.clone(), job);
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_embed_progress(&self, id: &str, embedded: usize, _rate: f32) {
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(id) {
|
if let Some(job) = jobs.get_mut(id) {
|
||||||
job.embedded_chunks = embedded;
|
job.processed_alias = embedded;
|
||||||
job.processed_alias = embedded; // keep alias in sync
|
|
||||||
job.progress_pct = if job.total_chunks > 0 {
|
job.progress_pct = if job.total_chunks > 0 {
|
||||||
(embedded as f32 / job.total_chunks as f32) * 100.0
|
(embedded as f32 / job.total_chunks as f32) * 100.0
|
||||||
} else {
|
} else {
|
||||||
0.0
|
0.0
|
||||||
};
|
};
|
||||||
job.chunks_per_sec = rate;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark job as completed.
|
pub async fn complete(&self, id: &str, result: Option<serde_json::Value>) {
|
||||||
pub async fn complete(&self, id: &str, storage_key: String) {
|
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(id) {
|
if let Some(job) = jobs.get_mut(id) {
|
||||||
job.status = JobStatus::Completed;
|
job.status = JobStatus::Completed;
|
||||||
job.embedded_chunks = job.total_chunks;
|
|
||||||
job.processed_alias = job.total_chunks;
|
|
||||||
job.progress_pct = 100.0;
|
job.progress_pct = 100.0;
|
||||||
job.storage_key = Some(storage_key);
|
|
||||||
job.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
job.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
||||||
|
job.result = result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark job as failed.
|
|
||||||
pub async fn fail(&self, id: &str, error: String) {
|
pub async fn fail(&self, id: &str, error: String) {
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(id) {
|
if let Some(job) = jobs.get_mut(id) {
|
||||||
@ -109,13 +138,11 @@ impl JobTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get job status.
|
|
||||||
pub async fn get(&self, id: &str) -> Option<Job> {
|
pub async fn get(&self, id: &str) -> Option<Job> {
|
||||||
self.jobs.read().await.get(id).cloned()
|
self.jobs.read().await.get(id).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// List all jobs.
|
|
||||||
pub async fn list(&self) -> Vec<Job> {
|
pub async fn list(&self) -> Vec<Job> {
|
||||||
self.jobs.read().await.values().cloned().collect()
|
self.jobs.read().await.values().cloned().collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -7,6 +7,7 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::json;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use aibridge::client::{AiClient, EmbedRequest, GenerateRequest};
|
use aibridge::client::{AiClient, EmbedRequest, GenerateRequest};
|
||||||
@ -196,7 +197,7 @@ async fn create_index(
|
|||||||
let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string());
|
let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string());
|
||||||
|
|
||||||
// Create job and return immediately
|
// Create job and return immediately
|
||||||
let job_id = state.job_tracker.create(&index_name, n_chunks).await;
|
let job_id = state.job_tracker.create_embed(&index_name, n_chunks).await;
|
||||||
tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks);
|
tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks);
|
||||||
|
|
||||||
// Spawn supervised dual-pipeline embedding
|
// Spawn supervised dual-pipeline embedding
|
||||||
@ -240,7 +241,7 @@ async fn create_index(
|
|||||||
};
|
};
|
||||||
let _ = registry.register(meta).await;
|
let _ = registry.register(meta).await;
|
||||||
|
|
||||||
tracker.complete(&jid, key).await;
|
tracker.complete(&jid, Some(json!({ "storage_key": key }))).await;
|
||||||
tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)");
|
tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -482,7 +483,7 @@ async fn _run_embedding_job_legacy(
|
|||||||
// Update progress
|
// Update progress
|
||||||
let elapsed = start.elapsed().as_secs_f32();
|
let elapsed = start.elapsed().as_secs_f32();
|
||||||
let rate = if elapsed > 0.0 { all_vectors.len() as f32 / elapsed } else { 0.0 };
|
let rate = if elapsed > 0.0 { all_vectors.len() as f32 / elapsed } else { 0.0 };
|
||||||
tracker.update_progress(job_id, all_vectors.len(), rate).await;
|
tracker.update_embed_progress(job_id, all_vectors.len(), rate).await;
|
||||||
|
|
||||||
// Log every 100 batches
|
// Log every 100 batches
|
||||||
if (i + 1) % 100 == 0 {
|
if (i + 1) % 100 == 0 {
|
||||||
@ -1371,239 +1372,218 @@ async fn activate_profile(
|
|||||||
State(state): State<VectorState>,
|
State(state): State<VectorState>,
|
||||||
Path(profile_id): Path<String>,
|
Path(profile_id): Path<String>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
let t0 = std::time::Instant::now();
|
tracing::info!("[activate_profile] START profile_id={}", profile_id);
|
||||||
|
|
||||||
let profile = match state.catalog.get_profile(&profile_id).await {
|
let profile = match state.catalog.get_profile(&profile_id).await {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
|
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut warmed = Vec::new();
|
let job_id = state.job_tracker.create_profile_activation(&profile_id).await;
|
||||||
let mut failures = Vec::new();
|
let job_id_for_response = job_id.clone();
|
||||||
let mut total_vectors = 0usize;
|
let tracker = state.job_tracker.clone();
|
||||||
|
let catalog = state.catalog.clone();
|
||||||
|
let index_registry = state.index_registry.clone();
|
||||||
|
let bucket_registry = state.bucket_registry.clone();
|
||||||
|
let lance = state.lance.clone();
|
||||||
|
let embedding_cache = state.embedding_cache.clone();
|
||||||
|
let hnsw_store = state.hnsw_store.clone();
|
||||||
|
let promotion_registry = state.promotion_registry.clone();
|
||||||
|
let ai_client = state.ai_client.clone();
|
||||||
|
let active_profile = state.active_profile.clone();
|
||||||
|
let profile_name = profile.ollama_name.clone();
|
||||||
|
let profile_id_clone = profile.id.clone();
|
||||||
|
let profile_bucket = profile.bucket.clone();
|
||||||
|
let profile_bound = profile.bound_datasets.clone();
|
||||||
|
let profile_hnsw = profile.hnsw_config.clone();
|
||||||
|
let profile_backend = profile.vector_backend.clone();
|
||||||
|
let profile_full = profile.clone();
|
||||||
|
|
||||||
// Phase 17 / C: VRAM-aware swap. If another profile currently holds
|
tokio::spawn(async move {
|
||||||
// the GPU and uses a DIFFERENT Ollama model than the one being
|
let t0 = std::time::Instant::now();
|
||||||
// activated, unload it first (keep_alive=0). Same-model activations
|
let mut warmed = Vec::new();
|
||||||
// skip the unload — no point churning a model that's already loaded.
|
let mut failures = Vec::new();
|
||||||
let previous_slot = {
|
let mut total_vectors = 0usize;
|
||||||
let guard = state.active_profile.read().await;
|
let job_id = job_id;
|
||||||
guard.clone()
|
|
||||||
};
|
|
||||||
if let Some(prev) = &previous_slot {
|
|
||||||
if prev.ollama_name != profile.ollama_name {
|
|
||||||
match state.ai_client.unload_model(&prev.ollama_name).await {
|
|
||||||
Ok(_) => tracing::info!(
|
|
||||||
"profile swap: unloaded '{}' ({} -> {})",
|
|
||||||
prev.ollama_name, prev.profile_id, profile.id,
|
|
||||||
),
|
|
||||||
Err(e) => failures.push(format!(
|
|
||||||
"unload previous model '{}': {e}", prev.ollama_name,
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Federation layer 2: if this profile declares its own bucket and
|
let previous_slot = {
|
||||||
// that bucket isn't registered yet, auto-provision it under the
|
let guard = active_profile.read().await;
|
||||||
// configured profile_root. This is the moment a "dormant" profile
|
guard.clone()
|
||||||
// becomes live — its bucket exists and is readable/writable.
|
};
|
||||||
if let Some(bucket_name) = profile.bucket.clone() {
|
if let Some(prev) = &previous_slot {
|
||||||
if !state.bucket_registry.contains(&bucket_name) {
|
if prev.ollama_name != profile_name {
|
||||||
let root = format!(
|
match ai_client.unload_model(&prev.ollama_name).await {
|
||||||
"{}/{}",
|
Ok(_) => tracing::info!(
|
||||||
state.bucket_registry.profile_root().trim_end_matches('/'),
|
"profile swap: unloaded '{}' ({} -> {})",
|
||||||
bucket_name.replace(':', "_"),
|
prev.ollama_name, prev.profile_id, profile_id_clone,
|
||||||
);
|
),
|
||||||
let bc = shared::config::BucketConfig {
|
Err(e) => failures.push(format!("unload previous model '{}': {e}", prev.ollama_name)),
|
||||||
name: bucket_name.clone(),
|
|
||||||
backend: "local".to_string(),
|
|
||||||
root: Some(root.clone()),
|
|
||||||
bucket: None,
|
|
||||||
region: None,
|
|
||||||
endpoint: None,
|
|
||||||
secret_ref: None,
|
|
||||||
};
|
|
||||||
match state.bucket_registry.add_bucket(bc).await {
|
|
||||||
Ok(info) => {
|
|
||||||
tracing::info!(
|
|
||||||
"profile '{}' activated bucket '{}' (root={}, reachable={})",
|
|
||||||
profile.id, bucket_name, root, info.reachable,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failures.push(format!(
|
|
||||||
"auto-provision bucket '{}': {}", bucket_name, e,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let all_indexes = state.index_registry.list(None, None).await;
|
if let Some(bucket_name) = profile_bucket.clone() {
|
||||||
let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance;
|
if !bucket_registry.contains(&bucket_name) {
|
||||||
|
let root = format!(
|
||||||
for binding in &profile.bound_datasets {
|
"{}/{}",
|
||||||
let matched: Vec<_> = all_indexes
|
bucket_registry.profile_root().trim_end_matches('/'),
|
||||||
.iter()
|
bucket_name.replace(':', "_"),
|
||||||
.filter(|m| &m.source == binding)
|
);
|
||||||
.collect();
|
let bc = shared::config::BucketConfig {
|
||||||
if matched.is_empty() {
|
name: bucket_name.clone(),
|
||||||
failures.push(format!(
|
backend: "local".to_string(),
|
||||||
"no vector index found for binding '{}'", binding,
|
root: Some(root.clone()),
|
||||||
));
|
bucket: None,
|
||||||
continue;
|
region: None,
|
||||||
}
|
endpoint: None,
|
||||||
for meta in matched {
|
secret_ref: None,
|
||||||
if use_lance {
|
|
||||||
// --- Lance activation path ---
|
|
||||||
// Ensure a Lance dataset exists for this index. If it
|
|
||||||
// doesn't, auto-migrate from the Parquet blob. Then
|
|
||||||
// ensure an IVF_PQ index is built.
|
|
||||||
let bucket = meta.bucket.clone();
|
|
||||||
let lance_store = match state.lance.store_for_new(&meta.index_name, &bucket).await {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
failures.push(format!("{}: lance store init: {e}", meta.index_name));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
let count = lance_store.count().await.unwrap_or(0);
|
match bucket_registry.add_bucket(bc).await {
|
||||||
if count == 0 {
|
Ok(info) => {
|
||||||
// Auto-migrate from existing Parquet.
|
tracing::info!(
|
||||||
let pq_store = match state.bucket_registry.get(&bucket) {
|
"profile '{}' activated bucket '{}' (root={}, reachable={})",
|
||||||
|
profile_id_clone, bucket_name, root, info.reachable,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => failures.push(format!("auto-provision bucket '{}': {}", bucket_name, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let all_indexes = index_registry.list(None, None).await;
|
||||||
|
let use_lance = profile_backend == shared::types::VectorBackend::Lance;
|
||||||
|
|
||||||
|
for binding in &profile_bound {
|
||||||
|
let matched: Vec<_> = all_indexes
|
||||||
|
.iter()
|
||||||
|
.filter(|m| &m.source == binding)
|
||||||
|
.collect();
|
||||||
|
if matched.is_empty() {
|
||||||
|
failures.push(format!("no vector index found for binding '{}'", binding));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for meta in matched {
|
||||||
|
if use_lance {
|
||||||
|
let bucket = meta.bucket.clone();
|
||||||
|
let lance_store = match lance.store_for_new(&meta.index_name, &bucket).await {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; }
|
Err(e) => { failures.push(format!("{}: lance store init: {e}", meta.index_name)); continue; }
|
||||||
};
|
};
|
||||||
match storaged::ops::get(&pq_store, &meta.storage_key).await {
|
let count = lance_store.count().await.unwrap_or(0);
|
||||||
Ok(bytes) => {
|
if count == 0 {
|
||||||
let build_t = std::time::Instant::now();
|
let pq_store = match bucket_registry.get(&bucket) {
|
||||||
match lance_store.migrate_from_parquet_bytes(&bytes).await {
|
Ok(s) => s,
|
||||||
Ok(ms) => {
|
Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; }
|
||||||
total_vectors += ms.rows_written;
|
};
|
||||||
tracing::info!(
|
match storaged::ops::get(&pq_store, &meta.storage_key).await {
|
||||||
"lance auto-migrate '{}': {} rows in {:.2}s",
|
Ok(bytes) => {
|
||||||
meta.index_name, ms.rows_written, ms.duration_secs,
|
let build_t = std::time::Instant::now();
|
||||||
);
|
match lance_store.migrate_from_parquet_bytes(&bytes).await {
|
||||||
warmed.push(WarmedIndex {
|
Ok(ms) => {
|
||||||
index_name: meta.index_name.clone(),
|
total_vectors += ms.rows_written;
|
||||||
source: meta.source.clone(),
|
tracing::info!("lance auto-migrate '{}': {} rows in {:.2}s", meta.index_name, ms.rows_written, ms.duration_secs);
|
||||||
vectors: ms.rows_written,
|
warmed.push(WarmedIndex {
|
||||||
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
|
index_name: meta.index_name.clone(),
|
||||||
});
|
source: meta.source.clone(),
|
||||||
|
vectors: ms.rows_written,
|
||||||
|
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)),
|
||||||
}
|
}
|
||||||
Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)),
|
|
||||||
}
|
}
|
||||||
|
Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)),
|
||||||
}
|
}
|
||||||
Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)),
|
} else {
|
||||||
}
|
total_vectors += count;
|
||||||
} else {
|
|
||||||
total_vectors += count;
|
|
||||||
warmed.push(WarmedIndex {
|
|
||||||
index_name: meta.index_name.clone(),
|
|
||||||
source: meta.source.clone(),
|
|
||||||
vectors: count,
|
|
||||||
hnsw_build_secs: 0.0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// Ensure IVF_PQ vector index exists.
|
|
||||||
if !lance_store.has_vector_index().await.unwrap_or(false) {
|
|
||||||
match lance_store.build_index(316, 8, 48).await {
|
|
||||||
Ok(ix) => tracing::info!(
|
|
||||||
"lance auto-index '{}': IVF_PQ built in {:.1}s",
|
|
||||||
meta.index_name, ix.build_time_secs,
|
|
||||||
),
|
|
||||||
Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Ensure scalar btree on doc_id for O(log N) random fetch.
|
|
||||||
if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) {
|
|
||||||
match lance_store.build_scalar_index("doc_id").await {
|
|
||||||
Ok(ix) => tracing::info!(
|
|
||||||
"lance auto-index '{}': doc_id btree built in {:.2}s",
|
|
||||||
meta.index_name, ix.build_time_secs,
|
|
||||||
),
|
|
||||||
Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// --- Parquet + HNSW activation path (existing) ---
|
|
||||||
let embeddings = match state.embedding_cache.get_or_load(&meta.index_name).await {
|
|
||||||
Ok(arc) => arc,
|
|
||||||
Err(e) => {
|
|
||||||
failures.push(format!("{}: load failed: {}", meta.index_name, e));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
total_vectors += embeddings.len();
|
|
||||||
|
|
||||||
let profile_default = trial::HnswConfig {
|
|
||||||
ef_construction: profile.hnsw_config.ef_construction,
|
|
||||||
ef_search: profile.hnsw_config.ef_search,
|
|
||||||
seed: profile.hnsw_config.seed,
|
|
||||||
};
|
|
||||||
let cfg = state
|
|
||||||
.promotion_registry
|
|
||||||
.config_or(&meta.index_name, profile_default)
|
|
||||||
.await;
|
|
||||||
let build_t = std::time::Instant::now();
|
|
||||||
match state
|
|
||||||
.hnsw_store
|
|
||||||
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
warmed.push(WarmedIndex {
|
warmed.push(WarmedIndex {
|
||||||
index_name: meta.index_name.clone(),
|
index_name: meta.index_name.clone(),
|
||||||
source: meta.source.clone(),
|
source: meta.source.clone(),
|
||||||
vectors: embeddings.len(),
|
vectors: count,
|
||||||
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
|
hnsw_build_secs: 0.0,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
if !lance_store.has_vector_index().await.unwrap_or(false) {
|
||||||
failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e));
|
match lance_store.build_index(316, 8, 48).await {
|
||||||
|
Ok(ix) => tracing::info!("lance auto-index '{}': IVF_PQ built in {:.1}s", meta.index_name, ix.build_time_secs),
|
||||||
|
Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) {
|
||||||
|
match lance_store.build_scalar_index("doc_id").await {
|
||||||
|
Ok(ix) => tracing::info!("lance auto-index '{}': doc_id btree built in {:.2}s", meta.index_name, ix.build_time_secs),
|
||||||
|
Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let embeddings = match embedding_cache.get_or_load(&meta.index_name).await {
|
||||||
|
Ok(arc) => arc,
|
||||||
|
Err(e) => { failures.push(format!("{}: load failed: {}", meta.index_name, e)); continue; }
|
||||||
|
};
|
||||||
|
total_vectors += embeddings.len();
|
||||||
|
|
||||||
|
let profile_default = trial::HnswConfig {
|
||||||
|
ef_construction: profile_hnsw.ef_construction,
|
||||||
|
ef_search: profile_hnsw.ef_search,
|
||||||
|
seed: profile_hnsw.seed,
|
||||||
|
};
|
||||||
|
let cfg = promotion_registry
|
||||||
|
.config_or(&meta.index_name, profile_default)
|
||||||
|
.await;
|
||||||
|
let build_t = std::time::Instant::now();
|
||||||
|
match hnsw_store
|
||||||
|
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
warmed.push(WarmedIndex {
|
||||||
|
index_name: meta.index_name.clone(),
|
||||||
|
source: meta.source.clone(),
|
||||||
|
vectors: embeddings.len(),
|
||||||
|
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Preload the new profile's Ollama model proactively. Same-model
|
let mut model_preloaded = false;
|
||||||
// re-activations are cheap (Ollama no-ops if already loaded).
|
match ai_client.preload_model(&profile_name).await {
|
||||||
let mut model_preloaded = false;
|
Ok(_) => {
|
||||||
match state.ai_client.preload_model(&profile.ollama_name).await {
|
model_preloaded = true;
|
||||||
Ok(_) => {
|
tracing::info!("profile '{}' preloaded ollama model '{}'", profile_id_clone, profile_name);
|
||||||
model_preloaded = true;
|
}
|
||||||
tracing::info!("profile '{}' preloaded ollama model '{}'",
|
Err(e) => failures.push(format!("preload ollama model '{}': {e}", profile_name)),
|
||||||
profile.id, profile.ollama_name);
|
|
||||||
}
|
}
|
||||||
Err(e) => failures.push(format!(
|
|
||||||
"preload ollama model '{}': {e}", profile.ollama_name,
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Take the GPU slot.
|
{
|
||||||
{
|
let mut guard = active_profile.write().await;
|
||||||
let mut guard = state.active_profile.write().await;
|
*guard = Some(ActiveProfileSlot {
|
||||||
*guard = Some(ActiveProfileSlot {
|
profile_id: profile_id_clone.clone(),
|
||||||
profile_id: profile.id.clone(),
|
ollama_name: profile_name.clone(),
|
||||||
ollama_name: profile.ollama_name.clone(),
|
activated_at: chrono::Utc::now(),
|
||||||
activated_at: chrono::Utc::now(),
|
});
|
||||||
});
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let report = ActivateReport {
|
let result = serde_json::to_value(ActivateReport {
|
||||||
profile_id: profile.id,
|
profile_id: profile_id_clone,
|
||||||
ollama_name: profile.ollama_name,
|
ollama_name: profile_name,
|
||||||
indexes_warmed: warmed,
|
indexes_warmed: warmed,
|
||||||
failures,
|
failures,
|
||||||
total_vectors,
|
total_vectors,
|
||||||
duration_secs: t0.elapsed().as_secs_f32(),
|
duration_secs: t0.elapsed().as_secs_f32(),
|
||||||
model_preloaded,
|
model_preloaded,
|
||||||
previous_profile: previous_slot.map(|s| s.profile_id),
|
previous_profile: previous_slot.map(|s| s.profile_id),
|
||||||
};
|
}).ok();
|
||||||
|
|
||||||
Ok(Json(report))
|
tracker.complete(&job_id, result).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Json(json!({
|
||||||
|
"job_id": job_id_for_response,
|
||||||
|
"message": format!("profile activation started — poll /vectors/jobs/{} for progress", job_id_for_response),
|
||||||
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unload this profile's model and clear the active slot. No-op if the
|
/// Unload this profile's model and clear the active slot. No-op if the
|
||||||
|
|||||||
@ -170,7 +170,7 @@ pub async fn run_supervised(
|
|||||||
// Update job tracker
|
// Update job tracker
|
||||||
let elapsed = start_time.elapsed().as_secs_f32();
|
let elapsed = start_time.elapsed().as_secs_f32();
|
||||||
let rate = if elapsed > 0.0 { done as f32 / elapsed } else { 0.0 };
|
let rate = if elapsed > 0.0 { done as f32 / elapsed } else { 0.0 };
|
||||||
tracker.update_progress(&jid, done, rate).await;
|
tracker.update_embed_progress(&jid, done, rate).await;
|
||||||
|
|
||||||
// Update checkpoint
|
// Update checkpoint
|
||||||
let mut ckpt = ckpt.write().await;
|
let mut ckpt = ckpt.write().await;
|
||||||
|
|||||||
@ -324,6 +324,16 @@
|
|||||||
- Mem0-style upsert: `/seed` with `append=true` (default) routes through `upsert_entry`, which decides ADD / UPDATE / NOOP on (operation, day, city, state). Same-day re-seed merges names (union, stable order) instead of duplicating the row. Identical re-seed is a no-op. Different-day same-operation is a fresh ADD. Playbook_id stays stable on UPDATE so prior citations remain valid.
|
- Mem0-style upsert: `/seed` with `append=true` (default) routes through `upsert_entry`, which decides ADD / UPDATE / NOOP on (operation, day, city, state). Same-day re-seed merges names (union, stable order) instead of duplicating the row. Identical re-seed is a no-op. Different-day same-operation is a fresh ADD. Playbook_id stays stable on UPDATE so prior citations remain valid.
|
||||||
- Letta-style hot cache: `PlaybookMemory` now holds a `geo_index: HashMap<(city_lower, state_upper), Vec<entry_idx>>` rebuilt on every mutation. Geo-filtered boost queries skip the full scan and hit the O(1) key lookup. At 1.9K entries the full scan was sub-ms; the index scales the same path to 100K+ without code changes.
|
- Letta-style hot cache: `PlaybookMemory` now holds a `geo_index: HashMap<(city_lower, state_upper), Vec<entry_idx>>` rebuilt on every mutation. Geo-filtered boost queries skip the full scan and hit the O(1) key lookup. At 1.9K entries the full scan was sub-ms; the index scales the same path to 100K+ without code changes.
|
||||||
- `UpsertOutcome` enum reported back to callers — `{mode: added|updated|noop, playbook_id, merged_names?}` + `entries_after`.
|
- `UpsertOutcome` enum reported back to callers — `{mode: added|updated|noop, playbook_id, merged_names?}` + `entries_after`.
|
||||||
|
- [x] **Phase 37: Hot-swap async** (2026-04-22)
|
||||||
|
- Extended `JobTracker` with `JobType::ProfileActivation` + `Embed` enum variants
|
||||||
|
- Made `activate_profile` return immediately with `job_id`, work runs in background via `tokio::spawn`
|
||||||
|
- Background jobs tracked via `POST /vectors/jobs/{id}` + `GET /vectors/jobs`
|
||||||
|
- [x] **Phase 38: Universal API Skeleton** (2026-04-23)
|
||||||
|
- `/v1/chat` — OpenAI-compatible POST, forwards to local Ollama or Ollama Cloud
|
||||||
|
- `/v1/usage` — returns `{requests, prompt_tokens, completion_tokens, total_tokens}`
|
||||||
|
- `/v1/sessions` — returns `{data: [], note: "Phase 38: stateless"}`
|
||||||
|
- Langfuse trace integration (fire-and-forget, Phase 40 early)
|
||||||
|
- 12 unit tests green, curl gates pass
|
||||||
- [ ] Fine-tuned domain models (Phase 25+)
|
- [ ] Fine-tuned domain models (Phase 25+)
|
||||||
- [ ] Multi-node query distribution (only if ceilings bite)
|
- [ ] Multi-node query distribution (only if ceilings bite)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user