root e9d17f7d5a sanitize: drop over-broad path-missing branch + UTF-8-safe redaction
Re-scrum of yesterday's sanitizer fix surfaced 2 more real bugs in the
fix itself (opus, both WARN, neither caught by kimi/qwen):

W1 (service.rs:1949) — `mentions_path_missing` standalone branch was
too aggressive. A registry-internal error like "/root/.cargo/.../x.rs:
no such file or directory" would 404 because it triggers without
dataset context. That's a real 500. Dropped the standalone branch;
require dataset context AND missing-shape phrase. Lance's actual
"Dataset at path X was not found" still satisfies it.

W2 (service.rs:2018) — `out.push(bytes[i] as char)` corrupted
multi-byte UTF-8 by casting raw bytes to char (only sound for ASCII
< 128). A path containing user-supplied non-ASCII names produced
Latin-1 mojibake. Rewrote redact_paths to track byte indices and
emit unmatched runs as &str slices via push_str(&s[range]) — preserves
multi-byte sequences verbatim. Step advance is now per-char, not
per-byte, via small utf8_char_len helper.

Two new regression tests:
- is_not_found_does_not_match_unrelated_path_missing
- redact_preserves_multibyte_utf8 (uses 工作 + café in input)

12/12 sanitize tests PASS. Smoke 10/10 PASS. Loop closure for opus
re-scrum on the 2026-05-02 fix bundle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 00:15:23 -05:00

3447 lines
134 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use aibridge::client::{AiClient, EmbedRequest, GenerateRequest};
use catalogd::registry::Registry as CatalogRegistry;
use storaged::registry::BucketRegistry;
use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, pathway_memory, playbook_memory, promotion, rag, refresh, search, store, supervisor, trial};
use tokio::sync::Semaphore;
#[derive(Clone)]
pub struct VectorState {
pub store: Arc<dyn ObjectStore>,
pub ai_client: AiClient,
pub job_tracker: jobs::JobTracker,
pub index_registry: index_registry::IndexRegistry,
pub hnsw_store: hnsw::HnswStore,
pub embedding_cache: embedding_cache::EmbeddingCache,
pub trial_journal: trial::TrialJournal,
/// Federation-aware harness store — resolves eval artifacts to each
/// index's recorded bucket, falling back to primary for legacy evals.
pub harness_store: harness::HarnessStore,
/// Catalog registry — needed by the Phase C refresh path to mark/clear
/// staleness and look up dataset manifests.
pub catalog: CatalogRegistry,
/// Phase 16: promoted HNSW configs. Activation + autotune read/write here.
pub promotion_registry: promotion::PromotionRegistry,
/// Phase 16.2: handle to the background autotune agent. Always
/// present — if the agent is disabled in config, the handle drops
/// incoming triggers silently.
pub agent_handle: agent::AgentHandle,
/// Phase B (federation layer 2): bucket registry for per-profile
/// bucket auto-provisioning on activation.
pub bucket_registry: Arc<BucketRegistry>,
/// Phase C (two-profile VRAM gate): tracks which profile is currently
/// "active" on the GPU. Singleton — one profile at a time holds its
/// model in VRAM. Swapping profiles with different ollama_name unloads
/// the previous one (keep_alive=0) before preloading the new one.
///
/// `None` = no profile has been activated this session; any first
/// activation just preloads and takes the slot.
pub active_profile: Arc<tokio::sync::RwLock<Option<ActiveProfileSlot>>>,
/// ADR-019 hybrid: handles to Lance datasets keyed by index name.
/// Lazy-created on first /vectors/lance/* call.
pub lance: lance_backend::LanceRegistry,
/// Phase 19 — meta-index feedback. Embeds past successful_playbooks
/// and, when `use_playbook_memory` is set on /vectors/hybrid, boosts
/// workers that were actually filled in semantically-similar past ops.
pub playbook_memory: playbook_memory::PlaybookMemory,
/// Pathway memory — consensus-designed sidecar for full-context
/// backtracking + hot-swap of successful review pathways. See
/// crates/vectord/src/pathway_memory.rs for the design rationale
/// (10-probe N=3 ensemble, locked 2026-04-24).
pub pathway_memory: pathway_memory::PathwayMemory,
/// Serializes embed calls from seed_playbook_memory to avoid
/// concurrent socket collisions with the Python sidecar.
pub embed_semaphore: Arc<Semaphore>,
}
/// What the active-profile singleton records. Narrow — we don't need the
/// full ModelProfile here, just enough to know what to unload on swap.
#[derive(Debug, Clone, Serialize)]
pub struct ActiveProfileSlot {
pub profile_id: String,
pub ollama_name: String,
pub activated_at: chrono::DateTime<chrono::Utc>,
}
pub fn router(state: VectorState) -> Router {
Router::new()
.route("/health", get(health))
.route("/index", post(create_index))
.route("/indexes", get(list_indexes))
.route("/indexes/{name}", get(get_index_meta))
.route("/indexes/{name}/bucket", axum::routing::patch(migrate_index_bucket))
.route("/jobs", get(list_jobs))
.route("/jobs/{id}", get(get_job))
// PRD Phase 41 alias — docs/CONTROL_PLANE_PRD.md specifies
// GET /vectors/profile/jobs/{id} for polling profile activations.
// Same handler as /jobs/{id}; the alias just matches the PRD URL.
.route("/profile/jobs/{id}", get(get_job))
.route("/search", post(search_index))
.route("/rag", post(rag_query))
.route("/hybrid", post(hybrid_search))
.route("/hnsw/build", post(build_hnsw))
.route("/hnsw/search", post(search_hnsw))
.route("/hnsw/list", get(list_hnsw))
// Trial system — parameterized tuning loop
.route("/hnsw/trial", post(run_trial))
.route("/hnsw/trials/{index_name}", get(list_trials))
.route("/hnsw/trials/{index_name}/best", get(best_trial))
// Eval sets
.route("/hnsw/evals", get(list_evals))
.route("/hnsw/evals/{name}", get(get_eval).put(put_eval))
.route("/hnsw/evals/{name}/autogen", post(autogen_eval))
// Cache management
.route("/hnsw/cache/stats", get(cache_stats))
.route("/hnsw/cache/{index_name}", axum::routing::delete(cache_evict))
// Phase C: embedding refresh
.route("/refresh/{dataset_name}", post(refresh_dataset))
.route("/stale", get(list_stale))
// Phase 17: profile activation — pre-load caches + HNSW for this
// model's bound data. First search after activate is warm.
.route("/profile/{id}/activate", post(activate_profile))
.route("/profile/{id}/deactivate", post(deactivate_profile))
.route("/profile/{id}/search", post(profile_scoped_search))
// Phase 17 VRAM gate: which profile currently owns the GPU?
.route("/profile/active", get(get_active_profile))
// Phase 16: promotion + autotune
.route("/hnsw/promote/{index}/{trial_id}", post(promote_trial))
.route("/hnsw/rollback/{index}", post(rollback_promotion))
.route("/hnsw/promoted/{index}", get(get_promoted))
.route("/hnsw/autotune", post(run_autotune_endpoint))
// Phase 16.2: background autotune agent
.route("/agent/status", get(agent_status))
.route("/agent/stop", post(agent_stop))
.route("/agent/enqueue/{index_name}", post(agent_enqueue))
// ADR-019: Lance hybrid backend
.route("/lance/migrate/{index_name}", post(lance_migrate))
.route("/lance/index/{index_name}", post(lance_build_index))
.route("/lance/search/{index_name}", post(lance_search))
.route("/lance/doc/{index_name}/{doc_id}", get(lance_get_doc))
.route("/lance/append/{index_name}", post(lance_append))
.route("/lance/stats/{index_name}", get(lance_stats))
.route("/lance/scalar-index/{index_name}/{column}", post(lance_build_scalar_index))
.route("/lance/recall/{index_name}", post(lance_recall_harness))
// Phase 19: playbook memory — the meta-index feedback loop
.route("/playbook_memory/rebuild", post(rebuild_playbook_memory))
.route("/playbook_memory/stats", get(playbook_memory_stats))
.route("/playbook_memory/seed", post(seed_playbook_memory))
.route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql))
.route("/playbook_memory/patterns", post(discover_playbook_patterns))
.route("/playbook_memory/mark_failed", post(mark_playbook_failed))
.route("/playbook_memory/retire", post(retire_playbook_memory))
.route("/playbook_memory/revise", post(revise_playbook_memory))
.route("/playbook_memory/history/{id}", get(playbook_memory_history))
.route("/playbook_memory/status", get(playbook_memory_status))
// Phase 45 slice 3 — doc drift detection + human re-admission.
.route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift))
.route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift))
// Phase 45 closure (2026-04-27) — batch scan across all active
// playbooks. Operator runs this on a schedule (cron or manual);
// each newly-detected drift writes a row to
// data/_kb/doc_drift_corrections.jsonl for downstream review.
.route("/playbook_memory/doc_drift/scan", post(scan_doc_drift))
// Pathway memory — consensus-designed sidecar (2026-04-24).
// scrum_master_pipeline POSTs /pathway/insert at the end of each
// review, calls /pathway/query before running the ladder for a
// potential hot-swap, and posts /pathway/record_replay after a
// hot-swap succeeds or fails.
.route("/pathway/insert", post(pathway_insert))
.route("/pathway/query", post(pathway_query))
.route("/pathway/record_replay", post(pathway_record_replay))
.route("/pathway/stats", get(pathway_stats))
// ADR-021 Phase C: pre-review bug-fingerprint retrieval.
.route("/pathway/bug_fingerprints", post(pathway_bug_fingerprints))
// Mem0 ops (J 2026-04-25): upsert/retire/revise/history.
.route("/pathway/upsert", post(pathway_upsert))
.route("/pathway/retire", post(pathway_retire))
.route("/pathway/revise", post(pathway_revise))
.route("/pathway/history/{trace_uid}", get(pathway_history))
.with_state(state)
}
async fn health() -> &'static str {
"vectord ok"
}
// --- Background Index Creation ---
#[derive(Deserialize)]
struct CreateIndexRequest {
index_name: String,
source: String,
documents: Vec<DocInput>,
chunk_size: Option<usize>,
overlap: Option<usize>,
/// Federation layer 2: optional bucket to hold this index's trial
/// journal + promotion file. Defaults to "primary" — pre-existing
/// clients that don't know about federation keep working unchanged.
#[serde(default)]
bucket: Option<String>,
}
#[derive(Deserialize)]
struct DocInput {
id: String,
text: String,
}
#[derive(Serialize)]
struct CreateIndexResponse {
job_id: String,
index_name: String,
documents: usize,
chunks: usize,
message: String,
}
async fn create_index(
State(state): State<VectorState>,
Json(req): Json<CreateIndexRequest>,
) -> impl IntoResponse {
let chunk_size = req.chunk_size.unwrap_or(500);
let overlap = req.overlap.unwrap_or(50);
// Chunk synchronously (fast)
let doc_ids: Vec<String> = req.documents.iter().map(|d| d.id.clone()).collect();
let texts: Vec<String> = req.documents.iter().map(|d| d.text.clone()).collect();
let chunks = chunker::chunk_column(&req.source, &doc_ids, &texts, chunk_size, overlap);
if chunks.is_empty() {
return Err((StatusCode::BAD_REQUEST, "no text to index".to_string()));
}
let n_docs = req.documents.len();
let n_chunks = chunks.len();
let index_name = req.index_name.clone();
let bucket = req.bucket.clone().unwrap_or_else(|| "primary".to_string());
// Create job and return immediately
let job_id = state.job_tracker.create_embed(&index_name, n_chunks).await;
tracing::info!("job {job_id}: indexing '{}' — {} docs → {} chunks (background)", index_name, n_docs, n_chunks);
// Spawn supervised dual-pipeline embedding
let tracker = state.job_tracker.clone();
let ai_client = state.ai_client.clone();
let obj_store = state.store.clone();
let registry = state.index_registry.clone();
let jid = job_id.clone();
let source_name = req.source.clone();
let idx_name = req.index_name.clone();
tokio::spawn(async move {
let start_time = std::time::Instant::now();
let config = supervisor::SupervisorConfig::default();
let result = supervisor::run_supervised(
&jid, &idx_name, chunks, &ai_client, &obj_store, &tracker, config,
).await;
match result {
Ok(key) => {
let elapsed = start_time.elapsed().as_secs_f32();
let rate = if elapsed > 0.0 { n_chunks as f32 / elapsed } else { 0.0 };
// Register index metadata with model version info
let meta = index_registry::IndexMeta {
index_name: idx_name.clone(),
source: source_name,
model_name: "nomic-embed-text".to_string(), // from sidecar config
model_version: "latest".to_string(),
dimensions: 768,
chunk_count: n_chunks,
doc_count: n_docs,
chunk_size: chunk_size,
overlap: overlap,
storage_key: key.clone(),
created_at: chrono::Utc::now(),
build_time_secs: elapsed,
chunks_per_sec: rate,
bucket: bucket.clone(),
vector_backend: shared::types::VectorBackend::Parquet,
id_prefix: None,
last_used: None,
build_signature: None,
};
let _ = registry.register(meta).await;
tracker.complete(&jid, Some(json!({ "storage_key": key }))).await;
tracing::info!("job {jid}: completed — {n_chunks} chunks in {elapsed:.0}s ({rate:.0}/sec)");
}
Err(e) => {
tracker.fail(&jid, e.clone()).await;
tracing::error!("job {jid}: failed — {e}");
}
}
});
Ok((StatusCode::ACCEPTED, Json(CreateIndexResponse {
job_id,
index_name: req.index_name,
documents: n_docs,
chunks: n_chunks,
message: format!("embedding {} chunks in background — poll /vectors/jobs/{{id}} for progress", n_chunks),
})))
}
// --- Index Registry ---
#[derive(Deserialize)]
struct IndexListQuery {
source: Option<String>,
model: Option<String>,
}
async fn list_indexes(
State(state): State<VectorState>,
Query(q): Query<IndexListQuery>,
) -> impl IntoResponse {
let indexes = state.index_registry.list(q.source.as_deref(), q.model.as_deref()).await;
Json(indexes)
}
async fn get_index_meta(
State(state): State<VectorState>,
Path(name): Path<String>,
) -> impl IntoResponse {
match state.index_registry.get(&name).await {
Some(meta) => Ok(Json(meta)),
None => Err((StatusCode::NOT_FOUND, format!("index not found: {name}"))),
}
}
#[derive(Deserialize)]
struct MigrateBucketRequest {
dest_bucket: String,
/// If true, delete artifacts from the source bucket after the pointer
/// flip. Default false — keeping source copies means a failed migration
/// is recoverable by editing IndexMeta.bucket back, and a successful
/// migration leaves inspectable forensics until an operator sweeps.
#[serde(default)]
delete_source: bool,
}
#[derive(Serialize)]
struct MigrateBucketReport {
index_name: String,
source_bucket: String,
dest_bucket: String,
/// Artifact keys that were copied (or attempted). Order follows copy order.
copied: Vec<String>,
/// Artifact prefixes that had nothing to copy (optional files missing,
/// trial journal empty, etc).
skipped: Vec<String>,
/// Subset of `copied` that was subsequently deleted from the source.
deleted_source: Vec<String>,
duration_secs: f32,
}
/// Move an index's artifacts from its current bucket to `dest_bucket`.
/// Parquet-backed indexes only — Lance migration needs URI rewriting that
/// isn't in scope for this endpoint. Copies the vector data, trial journal,
/// promotion file, and auto-generated harness; updates `IndexMeta.bucket`
/// last so a mid-flight failure leaves the index still usable at its
/// original location. Evicts the `EmbeddingCache` entry so the next load
/// re-reads from the new bucket.
async fn migrate_index_bucket(
State(state): State<VectorState>,
Path(name): Path<String>,
Json(req): Json<MigrateBucketRequest>,
) -> Result<Json<MigrateBucketReport>, (StatusCode, String)> {
let t0 = std::time::Instant::now();
let mut meta = state
.index_registry
.get(&name)
.await
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("index '{name}' not found")))?;
if meta.vector_backend == shared::types::VectorBackend::Lance {
return Err((
StatusCode::BAD_REQUEST,
"Lance-backed indexes cannot be migrated via this endpoint — \
Lance URIs are bucket-specific; a separate migrate_lance tool \
is needed".into(),
));
}
if !state.bucket_registry.contains(&req.dest_bucket) {
return Err((
StatusCode::BAD_REQUEST,
format!("dest bucket '{}' not registered", req.dest_bucket),
));
}
let source_bucket = meta.bucket.clone();
if source_bucket == req.dest_bucket {
return Err((
StatusCode::BAD_REQUEST,
format!("source and dest are both '{source_bucket}' — nothing to migrate"),
));
}
let src = state
.bucket_registry
.get(&source_bucket)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let dst = state
.bucket_registry
.get(&req.dest_bucket)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let mut copied: Vec<String> = Vec::new();
let mut skipped: Vec<String> = Vec::new();
// 1. Vector data (single parquet file for this backend).
copy_key(&src, &dst, &meta.storage_key)
.await
.map_err(|e| {
(StatusCode::INTERNAL_SERVER_ERROR,
format!("copy {}: {e}", meta.storage_key))
})?;
copied.push(meta.storage_key.clone());
// 2. Trial journal batches — per-index directory of JSONL files.
let trial_prefix = format!("_hnsw_trials/{name}/");
let trial_keys = storaged::ops::list(&src, Some(&trial_prefix))
.await
.unwrap_or_default();
if trial_keys.is_empty() {
skipped.push(trial_prefix);
}
for k in &trial_keys {
copy_key(&src, &dst, k)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("copy {k}: {e}")))?;
copied.push(k.clone());
}
// 3. Promotion file (optional — absent for never-promoted indexes).
let promo_key = format!("_hnsw_promotions/{name}.json");
match copy_key(&src, &dst, &promo_key).await {
Ok(()) => copied.push(promo_key),
Err(_) => skipped.push(promo_key),
}
// 4. Auto-generated harness (optional — absent if agent never ran).
let harness_key = format!("_hnsw_evals/{name}_auto.json");
match copy_key(&src, &dst, &harness_key).await {
Ok(()) => copied.push(harness_key),
Err(_) => skipped.push(harness_key),
}
// 5. Pointer flip — IndexMeta.bucket now points at destination. This
// is the commit point; earlier failures leave copies in dest but the
// index still usable at source.
meta.bucket = req.dest_bucket.clone();
state
.index_registry
.register(meta)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("update meta: {e}")))?;
// 6. Cache eviction — next load reads the new bucket's parquet.
state.embedding_cache.evict(&name).await;
// 7. Optional source cleanup.
let mut deleted_source: Vec<String> = Vec::new();
if req.delete_source {
for k in &copied {
if storaged::ops::delete(&src, k).await.is_ok() {
deleted_source.push(k.clone());
}
}
}
Ok(Json(MigrateBucketReport {
index_name: name,
source_bucket,
dest_bucket: req.dest_bucket,
copied,
skipped,
deleted_source,
duration_secs: t0.elapsed().as_secs_f32(),
}))
}
/// Stream a single object from one bucket to another. Uses the existing
/// `storaged::ops` get + put primitives — no native copy in object_store
/// across heterogeneous backends (local ↔ S3), so an in-memory hop is
/// unavoidable. Bounded by individual object size, which for our parquet
/// + jsonl artifacts tops out around a few hundred MB.
async fn copy_key(
src: &Arc<dyn ObjectStore>,
dst: &Arc<dyn ObjectStore>,
key: &str,
) -> Result<(), String> {
let data = storaged::ops::get(src, key).await?;
storaged::ops::put(dst, key, data).await
}
// --- Job Status ---
async fn list_jobs(State(state): State<VectorState>) -> impl IntoResponse {
let jobs = state.job_tracker.list().await;
Json(jobs)
}
async fn get_job(
State(state): State<VectorState>,
Path(id): Path<String>,
) -> impl IntoResponse {
match state.job_tracker.get(&id).await {
Some(job) => Ok(Json(job)),
None => Err((StatusCode::NOT_FOUND, format!("job not found: {id}"))),
}
}
// --- Search ---
#[derive(Deserialize)]
struct SearchRequest {
index_name: String,
query: String,
top_k: Option<usize>,
}
#[derive(Serialize)]
struct SearchResponse {
results: Vec<search::SearchResult>,
query: String,
}
async fn search_index(
State(state): State<VectorState>,
Json(req): Json<SearchRequest>,
) -> impl IntoResponse {
let top_k = req.top_k.unwrap_or(5);
let embed_resp = state.ai_client.embed(EmbedRequest {
texts: vec![req.query.clone()],
model: None,
}).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".to_string()));
}
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
let embeddings = store::load_embeddings(&state.store, &req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?;
let results = search::search(&query_vec, &embeddings, top_k);
Ok(Json(SearchResponse {
results,
query: req.query,
}))
}
// --- RAG ---
#[derive(Deserialize)]
struct RagRequest {
index_name: String,
question: String,
top_k: Option<usize>,
}
async fn rag_query(
State(state): State<VectorState>,
Json(req): Json<RagRequest>,
) -> impl IntoResponse {
let top_k = req.top_k.unwrap_or(5);
match rag::query(&req.question, &req.index_name, top_k, &state.store, &state.ai_client).await {
Ok(resp) => Ok(Json(resp)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Hybrid SQL+Vector Search ---
//
// The fix for the core RAG gap: vector search alone can't do structured
// filtering (state, role, reliability threshold). SQL alone can't do
// semantic similarity ("who could handle this kind of work"). Hybrid
// does both: SQL narrows to structurally-valid candidates, vector
// ranks them by semantic relevance, LLM generates from verified context.
#[derive(Deserialize)]
struct HybridRequest {
/// Natural language question — used for embedding + LLM generation.
question: String,
/// Vector index to search against.
index_name: String,
/// SQL WHERE clause to pre-filter. Applied against the index's source
/// dataset. Example: "state = 'IL' AND reliability > 0.8"
/// Safety: runs through DataFusion's parser so injection is bounded
/// by what DataFusion accepts (no DDL, no writes).
#[serde(default)]
sql_filter: Option<String>,
/// Dataset to run the SQL filter against. Defaults to the index's
/// source if omitted.
#[serde(default)]
filter_dataset: Option<String>,
/// Column in the SQL result that maps to the vector index's doc_id.
/// Default: "worker_id" (for the Ethereal dataset) or "candidate_id".
#[serde(default)]
id_column: Option<String>,
#[serde(default = "default_top_k")]
top_k: usize,
/// If true, generate an LLM answer from the matched context.
/// If false, just return the ranked matches (faster, no Ollama gen).
#[serde(default = "default_true")]
generate: bool,
/// Phase 19: consult `playbook_memory` and boost workers that past
/// similar playbooks successfully filled. Off by default so current
/// callers keep deterministic ranking; opt-in unlocks the feedback.
#[serde(default)]
use_playbook_memory: bool,
/// Number of past playbooks to consider when `use_playbook_memory`
/// is on. Ignored otherwise. Defaults to 5.
#[serde(default)]
playbook_memory_k: Option<usize>,
}
fn default_true() -> bool { true }
#[derive(serde::Serialize)]
struct HybridResponse {
question: String,
sql_filter: Option<String>,
sql_matches: usize,
vector_reranked: usize,
method: String,
answer: Option<String>,
sources: Vec<HybridSource>,
duration_ms: u64,
}
#[derive(serde::Serialize)]
struct HybridSource {
doc_id: String,
chunk_text: String,
score: f32,
sql_verified: bool,
/// Phase 19: how much the playbook_memory boost lifted this hit's
/// score. 0.0 when `use_playbook_memory=false` or no past playbook
/// endorsed this worker.
#[serde(default, skip_serializing_if = "is_zero")]
playbook_boost: f32,
/// playbook_ids whose endorsement contributed to `playbook_boost`.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
playbook_citations: Vec<String>,
}
fn is_zero(x: &f32) -> bool { x.abs() < 1e-6 }
async fn hybrid_search(
State(state): State<VectorState>,
Json(req): Json<HybridRequest>,
) -> impl IntoResponse {
let t0 = std::time::Instant::now();
// Step 1: If SQL filter provided, run it to get the set of valid IDs.
let valid_ids: Option<std::collections::HashSet<String>> = if let Some(ref filter) = req.sql_filter {
let index_meta = state.index_registry.get(&req.index_name).await;
let dataset = req.filter_dataset.clone()
.or_else(|| index_meta.map(|m| m.source.clone()))
.unwrap_or_else(|| req.index_name.clone());
let id_col = req.id_column.clone().unwrap_or_else(|| "worker_id".into());
let sql = format!("SELECT CAST({id_col} AS VARCHAR) AS id FROM {dataset} WHERE {filter}");
tracing::info!("hybrid: SQL filter → {sql}");
// Use queryd through the catalog — same engine as /query/sql
// Use the query engine to get JSON rows — avoids Arrow type
// wrangling across DataFusion's Utf8View/StringViewArray variants.
let engine = queryd::context::QueryEngine::new(
state.catalog.clone(),
state.bucket_registry.clone(),
queryd::cache::MemCache::new(0),
);
match engine.query(&sql).await {
Ok(batches) => {
use arrow::array::{Array, AsArray};
let mut ids = std::collections::HashSet::new();
for batch in &batches {
if let Some(col) = batch.column_by_name("id") {
// DataFusion CAST(x AS VARCHAR) → StringViewArray.
// Try StringView first, then String, then Int.
if let Some(arr) = col.as_string_view_opt() {
for i in 0..arr.len() {
if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); }
}
} else if let Some(arr) = col.as_string_opt::<i32>() {
for i in 0..arr.len() {
if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); }
}
} else {
// Fallback: try as Int32/Int64 (if CAST didn't happen)
if let Some(arr) = col.as_any().downcast_ref::<arrow::array::Int32Array>() {
for i in 0..arr.len() {
if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); }
}
} else if let Some(arr) = col.as_any().downcast_ref::<arrow::array::Int64Array>() {
for i in 0..arr.len() {
if !arr.is_null(i) { ids.insert(arr.value(i).to_string()); }
}
}
}
}
}
tracing::info!("hybrid: SQL filter returned {} IDs", ids.len());
if ids.is_empty() { None } else { Some(ids) }
}
Err(e) => {
return Err((StatusCode::BAD_REQUEST, format!("SQL filter error: {e}")));
}
}
} else {
None
};
// Step 2: Vector search — embed question, search index.
let embed_resp = state.ai_client
.embed(EmbedRequest { texts: vec![req.question.clone()], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding".into()));
}
let qv: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// When SQL-filtered: use brute-force cosine over all embeddings,
// then filter by SQL IDs, then take top_k. HNSW's ef_search caps
// results at ~30, which is too few to reliably intersect with
// narrow SQL filters. Brute-force on 10K vectors is ~50ms — fast
// enough for the hybrid path. Without SQL filter, use HNSW normally.
let all_results = if valid_ids.is_some() {
// Brute-force path: score ALL vectors, filter by SQL IDs later.
let embeddings = store::load_embeddings(&state.store, &req.index_name).await
.map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?;
search::search(&qv, &embeddings, embeddings.len()) // score everything
} else if state.hnsw_store.has_index(&req.index_name).await {
state.hnsw_store.search(&req.index_name, &qv, req.top_k).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?
.into_iter()
.map(|h| search::SearchResult {
doc_id: h.doc_id,
chunk_text: h.chunk_text,
score: h.score,
source: h.source,
chunk_idx: h.chunk_idx as u32,
})
.collect::<Vec<_>>()
} else {
let embeddings = store::load_embeddings(&state.store, &req.index_name).await
.map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?;
search::search(&qv, &embeddings, req.top_k)
};
// Step 3: Filter vector results to only SQL-verified IDs.
// ADR-020: read the index's id_prefix from the catalog instead of
// hardcoding prefix stripping. Falls back to heuristic for legacy indexes.
let id_prefix: Option<String> = state.index_registry
.get(&req.index_name).await
.and_then(|m| m.id_prefix.clone());
let sql_count = valid_ids.as_ref().map(|s| s.len()).unwrap_or(0);
// Phase 19: when playbook_memory is consulted, pull a wider candidate
// pool so endorsed workers outside the vanilla top-K can still be
// boosted into visibility. 5× is a conservative multiplier — plenty
// for a +0.25 boost to flip rankings without dragging the cost up.
let fetch_k = if req.use_playbook_memory { req.top_k * 5 } else { req.top_k };
let filtered: Vec<search::SearchResult> = if let Some(ref ids) = valid_ids {
all_results.into_iter()
.filter(|r| {
let raw_id = if let Some(ref prefix) = id_prefix {
r.doc_id.strip_prefix(prefix.as_str()).unwrap_or(&r.doc_id)
} else {
// Legacy: heuristic strip for pre-ADR-020 indexes
r.doc_id.strip_prefix("W500K-")
.or_else(|| r.doc_id.strip_prefix("W500-"))
.or_else(|| r.doc_id.strip_prefix("W5K-"))
.or_else(|| r.doc_id.strip_prefix("W-"))
.or_else(|| r.doc_id.strip_prefix("CAND-"))
.unwrap_or(&r.doc_id)
};
ids.contains(raw_id)
})
.take(fetch_k)
.collect()
} else {
all_results.into_iter().take(fetch_k).collect()
};
// Step 4: Build sources with SQL-verified flag.
let mut sources: Vec<HybridSource> = filtered.iter().map(|r| HybridSource {
doc_id: r.doc_id.clone(),
chunk_text: r.chunk_text.clone(),
score: r.score,
sql_verified: valid_ids.is_some(),
playbook_boost: 0.0,
playbook_citations: Vec::new(),
}).collect();
// Step 4b (Phase 19): if use_playbook_memory, look up semantically
// similar past playbooks and boost workers they endorsed. Name-match
// is on the tuple (city, state, name) extracted from chunk_text —
// hybrid_search's SQL filter already narrowed to one city+state, so
// this just needs to check the name against each playbook's endorsed
// set. Additive boost on the existing vector score, then re-sort.
if req.use_playbook_memory {
let boost_k = req.playbook_memory_k.unwrap_or(playbook_memory::DEFAULT_TOP_K_PLAYBOOKS);
// Extract target (city, state, role) from the SQL filter so
// compute_boost_for can skip playbooks from other cities AND
// prioritize exact role matches via the multi-strategy path.
// The executor's filter shape is stable:
// `... role = 'Welder' AND city = 'Toledo' AND state = 'OH' ...`.
// Case-insensitive match, tolerant of single quotes.
let target_geo = req.sql_filter.as_deref().and_then(extract_target_geo);
let target_role = req.sql_filter.as_deref().and_then(extract_target_role);
// We embedded the question as `qv` above — reuse it for the
// playbook similarity lookup so we don't double-pay Ollama.
let boosts = state.playbook_memory
.compute_boost_for_filtered_with_role(
&qv,
boost_k,
0.5,
target_geo.as_ref().map(|(c, s)| (c.as_str(), s.as_str())),
target_role.as_deref(),
)
.await;
// Diagnostics for Phase 19 boost pipeline. Logged so item 3
// investigation has ground truth:
// - boosts.len(): how many (city,state,name) keys surfaced for
// this query (0 = playbook_memory found nothing semantically
// similar to the question).
// - parsed: how many candidate chunks parsed cleanly into
// (name,city,state) via parse_worker_chunk.
// - matched: how many parsed keys matched an entry in boosts.
// 2026-04-21 — 20-scenario batch showed 34/40 ok combos never
// got a citation. These counters pin whether the gap is on the
// SIMILARITY side (boosts empty) or the MATCH side (parsed vs
// boosted keys mismatch — e.g. name format drift).
let mut parsed_count = 0usize;
let mut matched_count = 0usize;
for src in sources.iter_mut() {
// Parse "{Name} — {Role} in {City}, {State}. …" chunk. Being
// defensive: chunks from other datasets may not follow this
// exact shape, so absent fields just skip the boost.
if let Some((name, city, state)) = parse_worker_chunk(&src.chunk_text) {
parsed_count += 1;
let key = (city, state, name);
if let Some(entry) = boosts.get(&key) {
src.score += entry.boost;
src.playbook_boost = entry.boost;
src.playbook_citations = entry.citations.clone();
matched_count += 1;
}
}
}
tracing::info!(
"playbook_boost: boosts={} sources={} parsed={} matched={} target_geo={:?} target_role={:?} (query='{}')",
boosts.len(),
sources.len(),
parsed_count,
matched_count,
target_geo,
target_role,
req.question.chars().take(60).collect::<String>(),
);
// Re-rank: boosted scores can flip ordering.
sources.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
// Finally trim to the caller's requested top_k — we pulled fetch_k
// (5× wider) above specifically so the boost could reach workers
// that would otherwise have been trimmed pre-boost.
sources.truncate(req.top_k);
}
// Step 5: Generate answer if requested.
let answer = if req.generate && !sources.is_empty() {
let context: String = sources.iter().enumerate().map(|(i, s)| {
format!("[{}] (id: {}, verified: {}) {}", i + 1, s.doc_id, s.sql_verified, s.chunk_text)
}).collect::<Vec<_>>().join("\n\n");
let gen_resp = state.ai_client.generate(GenerateRequest {
prompt: format!(
"You are a staffing intelligence assistant. Answer based ONLY on these \
verified worker records. Every record has been SQL-verified against the \
database — you can trust the facts in them. Be specific: cite names, \
skills, certifications, scores, and locations.\n\n\
Records:\n{context}\n\n\
Question: {}\n\nAnswer:", req.question,
),
model: None,
system: None,
temperature: Some(0.2),
max_tokens: Some(512),
// Hybrid's answer step — prose output over retrieved records,
// no reasoning needed on the hot path.
think: Some(false),
}).await;
gen_resp.ok().map(|r| r.text.trim().to_string())
} else {
None
};
let method = if valid_ids.is_some() { "hybrid_sql_vector" } else { "vector_only" };
Ok(Json(HybridResponse {
question: req.question,
sql_filter: req.sql_filter,
sql_matches: sql_count,
vector_reranked: sources.len(),
method: method.into(),
answer,
sources,
duration_ms: t0.elapsed().as_millis() as u64,
}))
}
// --- HNSW Fast Search ---
#[derive(Deserialize)]
struct BuildHnswRequest {
/// Name of the stored vector index to build HNSW from
index_name: String,
/// Optional config override. Omit to use the production default
/// (ec=80 es=30 — see HnswConfig::default docs for rationale).
#[serde(default)]
config: Option<trial::HnswConfig>,
}
/// Build an HNSW index from an existing stored vector index.
/// Uses the embedding cache so repeated builds don't reload from Parquet.
async fn build_hnsw(
State(state): State<VectorState>,
Json(req): Json<BuildHnswRequest>,
) -> impl IntoResponse {
let config = req.config.unwrap_or_default();
tracing::info!(
"building HNSW for '{}' ef_construction={} ef_search={}",
req.index_name, config.ef_construction, config.ef_search,
);
let embeddings = state
.embedding_cache
.get_or_load(&req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("index not found: {e}")))?;
match state
.hnsw_store
.build_index_with_config(&req.index_name, (*embeddings).clone(), &config)
.await
{
Ok(stats) => Ok(Json(stats)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct HnswSearchRequest {
index_name: String,
query: String,
top_k: Option<usize>,
}
/// Search using HNSW — approximate nearest neighbors, much faster than brute-force.
async fn search_hnsw(
State(state): State<VectorState>,
Json(req): Json<HnswSearchRequest>,
) -> impl IntoResponse {
let top_k = req.top_k.unwrap_or(5);
// Embed query
let embed_resp = state.ai_client.embed(EmbedRequest {
texts: vec![req.query.clone()],
model: None,
}).await.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed error: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".to_string()));
}
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// Search HNSW
match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await {
Ok(results) => Ok(Json(serde_json::json!({
"results": results,
"query": req.query,
"method": "hnsw",
}))),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
async fn list_hnsw(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.hnsw_store.list().await)
}
// --- Trial System: parameterized HNSW tuning loop ---
//
// Flow:
// 1. Agent picks an HnswConfig
// 2. POST /hnsw/trial builds HNSW with that config against cached embeddings,
// runs every query in the harness, measures latency + recall vs the
// harness's ground truth, appends a Trial record to _hnsw_trials/{idx}.jsonl
// 3. Agent reads GET /hnsw/trials/{index}, sees history, decides next config
// 4. Repeat until converged.
//
// The first trial triggers embedding load (slow). Every subsequent trial reuses
// the cache — so the agent iterates in seconds, not minutes.
#[derive(Deserialize)]
struct TrialRequest {
index_name: String,
harness: String,
#[serde(default)]
config: trial::HnswConfig,
#[serde(default)]
note: Option<String>,
}
async fn run_trial(
State(state): State<VectorState>,
Json(req): Json<TrialRequest>,
) -> Result<Json<trial::Trial>, (StatusCode, String)> {
let mut harness_set = state.harness_store.load_for_index(&req.index_name, &req.harness)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("harness not found: {e}")))?;
if harness_set.index_name != req.index_name {
return Err((
StatusCode::BAD_REQUEST,
format!(
"harness '{}' is for index '{}', not '{}'",
req.harness, harness_set.index_name, req.index_name
),
));
}
if harness_set.queries.is_empty() {
return Err((StatusCode::BAD_REQUEST, "harness has no queries".into()));
}
let embeddings = state
.embedding_cache
.get_or_load(&req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?;
if !harness_set.ground_truth_built {
tracing::info!("trial: computing ground truth for harness '{}'", harness_set.name);
let t0 = std::time::Instant::now();
harness::compute_ground_truth(&mut harness_set, &embeddings, &state.ai_client)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ground truth: {e}")))?;
tracing::info!("trial: ground truth built in {:.1}s", t0.elapsed().as_secs_f32());
state.harness_store
.save(&harness_set)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("save harness: {e}")))?;
}
let trial_id = trial::Trial::new_id();
let hnsw_slot = format!("{}__{}", req.index_name, trial_id);
let build_stats = state
.hnsw_store
.build_index_with_config(&hnsw_slot, (*embeddings).clone(), &req.config)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("build: {e}")))?;
let query_vectors: Vec<Vec<f32>> = harness_set
.queries
.iter()
.filter_map(|q| q.query_embedding.clone())
.collect();
let bench = state
.hnsw_store
.bench_search(&hnsw_slot, &query_vectors, harness_set.k)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("search: {e}")))?;
let mut recalls = Vec::with_capacity(harness_set.queries.len());
for (q, hits) in harness_set.queries.iter().zip(bench.retrieved.iter()) {
if let Some(gt) = &q.ground_truth {
recalls.push(harness::recall_at_k(hits, gt, harness_set.k));
}
}
let mean_recall = if recalls.is_empty() {
0.0
} else {
recalls.iter().sum::<f32>() / recalls.len() as f32
};
let mut lats = bench.latencies_us.clone();
lats.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p = |pct: f32| -> f32 {
if lats.is_empty() { return 0.0; }
let idx = ((lats.len() as f32 - 1.0) * pct).round() as usize;
lats[idx.min(lats.len() - 1)]
};
// One brute-force reference latency — keeps the cost proportional to
// whatever the agent is willing to pay per trial.
let brute_latency_us = if let Some(qv) = query_vectors.first() {
let t0 = std::time::Instant::now();
let _ = harness::brute_force_top_k(qv, &embeddings, harness_set.k);
t0.elapsed().as_micros() as f32
} else {
0.0
};
let dims = embeddings.first().map(|e| e.vector.len()).unwrap_or(0);
let memory_bytes =
(embeddings.len() * dims * std::mem::size_of::<f32>() + embeddings.len() * 128) as u64;
let trial_record = trial::Trial {
id: trial_id.clone(),
index_name: req.index_name.clone(),
eval_set: req.harness.clone(),
config: req.config.clone(),
metrics: trial::TrialMetrics {
build_time_secs: build_stats.build_time_secs,
search_latency_p50_us: p(0.50),
search_latency_p95_us: p(0.95),
search_latency_p99_us: p(0.99),
recall_at_k: mean_recall,
memory_bytes,
vectors: build_stats.vectors,
eval_queries: harness_set.queries.len(),
brute_force_latency_us: brute_latency_us,
},
created_at: chrono::Utc::now(),
note: req.note,
};
state
.trial_journal
.append(&trial_record)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("journal: {e}")))?;
state.hnsw_store.drop(&hnsw_slot).await;
Ok(Json(trial_record))
}
async fn list_trials(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
match state.trial_journal.list(&index_name).await {
Ok(trials) => Ok(Json(trials)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct BestTrialQuery {
#[serde(default = "default_metric")]
metric: String,
}
fn default_metric() -> String {
"pareto".to_string()
}
async fn best_trial(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Query(q): Query<BestTrialQuery>,
) -> impl IntoResponse {
match state.trial_journal.best(&index_name, &q.metric).await {
Ok(Some(t)) => Ok(Json(t)),
Ok(None) => Err((StatusCode::NOT_FOUND, "no trials yet".to_string())),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Harness management ---
async fn list_evals(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.harness_store.list_all().await)
}
async fn get_eval(
State(state): State<VectorState>,
Path(name): Path<String>,
) -> impl IntoResponse {
match state.harness_store.get_any(&name).await {
Ok(e) => Ok(Json(e)),
Err(err) => Err((StatusCode::NOT_FOUND, err)),
}
}
async fn put_eval(
State(state): State<VectorState>,
Path(name): Path<String>,
Json(mut harness_set): Json<harness::EvalSet>,
) -> impl IntoResponse {
harness_set.name = name;
harness_set.ground_truth_built = harness_set
.queries
.iter()
.all(|q| q.ground_truth.is_some());
match state.harness_store.save(&harness_set).await {
Ok(()) => Ok(Json(harness_set)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct AutogenRequest {
index_name: String,
#[serde(default = "default_sample_count")]
sample_count: usize,
#[serde(default = "default_k")]
k: usize,
}
fn default_sample_count() -> usize { 100 }
fn default_k() -> usize { 10 }
async fn autogen_eval(
State(state): State<VectorState>,
Path(name): Path<String>,
Json(req): Json<AutogenRequest>,
) -> Result<Json<harness::EvalSet>, (StatusCode, String)> {
let embeddings = state
.embedding_cache
.get_or_load(&req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("load embeddings: {e}")))?;
let mut harness_set = harness::synthetic_from_chunks(
&name,
&req.index_name,
&embeddings,
req.sample_count,
req.k,
);
harness::compute_ground_truth(&mut harness_set, &embeddings, &state.ai_client)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ground truth: {e}")))?;
state.harness_store
.save(&harness_set)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("save: {e}")))?;
Ok(Json(harness_set))
}
// --- Embedding cache management ---
async fn cache_stats(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.embedding_cache.stats().await)
}
async fn cache_evict(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
let ok = state.embedding_cache.evict(&index_name).await;
Json(serde_json::json!({ "evicted": ok, "index_name": index_name }))
}
// --- Phase C: embedding refresh ---
//
// Decouples "new row data arrived" from "re-embed everything." Ingest marks
// a dataset's embeddings stale (see catalogd::registry::mark_embeddings_stale);
// `/vectors/refresh/{dataset}` diffs existing embeddings against current
// rows, embeds only the new ones, appends to the index, and clears the
// stale flag.
async fn refresh_dataset(
State(state): State<VectorState>,
Path(dataset_name): Path<String>,
Json(req): Json<refresh::RefreshRequest>,
) -> Result<Json<refresh::RefreshResult>, (StatusCode, String)> {
tracing::info!(
"refresh requested for dataset '{}' -> index '{}'",
dataset_name, req.index_name,
);
match refresh::refresh_index(
&dataset_name,
&req,
&state.store,
&state.catalog,
&state.ai_client,
&state.embedding_cache,
&state.index_registry,
)
.await
{
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Serialize)]
struct StaleEntry {
dataset_name: String,
last_embedded_at: Option<String>,
stale_since: String,
refresh_policy: Option<shared::types::RefreshPolicy>,
}
async fn list_stale(State(state): State<VectorState>) -> impl IntoResponse {
let datasets = state.catalog.stale_datasets().await;
let entries: Vec<StaleEntry> = datasets
.into_iter()
.map(|d| StaleEntry {
dataset_name: d.name,
last_embedded_at: d.last_embedded_at.map(|t| t.to_rfc3339()),
stale_since: d
.embedding_stale_since
.map(|t| t.to_rfc3339())
.unwrap_or_default(),
refresh_policy: d.embedding_refresh_policy,
})
.collect();
Json(entries)
}
// --- Phase 17: Model profile activation + scoped search ---
#[derive(Serialize)]
struct ActivateReport {
profile_id: String,
ollama_name: String,
indexes_warmed: Vec<WarmedIndex>,
failures: Vec<String>,
total_vectors: usize,
duration_secs: f32,
/// Phase C: did we successfully preload the Ollama model?
model_preloaded: bool,
/// Phase C: which profile previously held the GPU slot, if any.
/// Useful for observability of the swap.
previous_profile: Option<String>,
}
#[derive(Serialize)]
struct WarmedIndex {
index_name: String,
source: String,
vectors: usize,
hnsw_build_secs: f32,
}
/// Warm this profile's indexes. For every bound dataset, find the
/// matching vector index (any index whose `source` equals the dataset
/// or view name), load its embeddings into EmbeddingCache, build HNSW
/// with the profile's config. Next `/profile/{id}/search` call is then
/// <1ms cold.
///
/// Failures on individual indexes don't stop the activation — they get
/// reported in the response. This matches the "substrate keeps working"
/// philosophy from ADR-017: one bad binding shouldn't take down the
/// whole profile.
async fn activate_profile(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
) -> impl IntoResponse {
tracing::info!("[activate_profile] START profile_id={}", profile_id);
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
let job_id = state.job_tracker.create_profile_activation(&profile_id).await;
let job_id_for_response = job_id.clone();
let tracker = state.job_tracker.clone();
let _catalog = state.catalog.clone();
let index_registry = state.index_registry.clone();
let bucket_registry = state.bucket_registry.clone();
let lance = state.lance.clone();
let embedding_cache = state.embedding_cache.clone();
let hnsw_store = state.hnsw_store.clone();
let promotion_registry = state.promotion_registry.clone();
let ai_client = state.ai_client.clone();
let active_profile = state.active_profile.clone();
let profile_name = profile.ollama_name.clone();
let profile_id_clone = profile.id.clone();
let profile_bucket = profile.bucket.clone();
let profile_bound = profile.bound_datasets.clone();
let profile_hnsw = profile.hnsw_config.clone();
let profile_backend = profile.vector_backend.clone();
let _profile_full = profile.clone();
tokio::spawn(async move {
let t0 = std::time::Instant::now();
let mut warmed = Vec::new();
let mut failures = Vec::new();
let mut total_vectors = 0usize;
let job_id = job_id;
let previous_slot = {
let guard = active_profile.read().await;
guard.clone()
};
if let Some(prev) = &previous_slot {
if prev.ollama_name != profile_name {
match ai_client.unload_model(&prev.ollama_name).await {
Ok(_) => tracing::info!(
"profile swap: unloaded '{}' ({} -> {})",
prev.ollama_name, prev.profile_id, profile_id_clone,
),
Err(e) => failures.push(format!("unload previous model '{}': {e}", prev.ollama_name)),
}
}
}
if let Some(bucket_name) = profile_bucket.clone() {
if !bucket_registry.contains(&bucket_name) {
let root = format!(
"{}/{}",
bucket_registry.profile_root().trim_end_matches('/'),
bucket_name.replace(':', "_"),
);
let bc = shared::config::BucketConfig {
name: bucket_name.clone(),
backend: "local".to_string(),
root: Some(root.clone()),
bucket: None,
region: None,
endpoint: None,
secret_ref: None,
};
match bucket_registry.add_bucket(bc).await {
Ok(info) => {
tracing::info!(
"profile '{}' activated bucket '{}' (root={}, reachable={})",
profile_id_clone, bucket_name, root, info.reachable,
);
}
Err(e) => failures.push(format!("auto-provision bucket '{}': {}", bucket_name, e)),
}
}
}
let all_indexes = index_registry.list(None, None).await;
let use_lance = profile_backend == shared::types::VectorBackend::Lance;
for binding in &profile_bound {
let matched: Vec<_> = all_indexes
.iter()
.filter(|m| &m.source == binding)
.collect();
if matched.is_empty() {
failures.push(format!("no vector index found for binding '{}'", binding));
continue;
}
for meta in matched {
if use_lance {
let bucket = meta.bucket.clone();
let lance_store = match lance.store_for_new(&meta.index_name, &bucket).await {
Ok(s) => s,
Err(e) => { failures.push(format!("{}: lance store init: {e}", meta.index_name)); continue; }
};
let count = lance_store.count().await.unwrap_or(0);
if count == 0 {
let pq_store = match bucket_registry.get(&bucket) {
Ok(s) => s,
Err(e) => { failures.push(format!("{}: bucket: {e}", meta.index_name)); continue; }
};
match storaged::ops::get(&pq_store, &meta.storage_key).await {
Ok(bytes) => {
let build_t = std::time::Instant::now();
match lance_store.migrate_from_parquet_bytes(&bytes).await {
Ok(ms) => {
total_vectors += ms.rows_written;
tracing::info!("lance auto-migrate '{}': {} rows in {:.2}s", meta.index_name, ms.rows_written, ms.duration_secs);
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: ms.rows_written,
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
});
}
Err(e) => failures.push(format!("{}: lance migrate: {e}", meta.index_name)),
}
}
Err(e) => failures.push(format!("{}: read parquet: {e}", meta.index_name)),
}
} else {
total_vectors += count;
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: count,
hnsw_build_secs: 0.0,
});
}
if !lance_store.has_vector_index().await.unwrap_or(false) {
match lance_store.build_index(316, 8, 48).await {
Ok(ix) => tracing::info!("lance auto-index '{}': IVF_PQ built in {:.1}s", meta.index_name, ix.build_time_secs),
Err(e) => failures.push(format!("{}: lance IVF_PQ build: {e}", meta.index_name)),
}
}
if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) {
match lance_store.build_scalar_index("doc_id").await {
Ok(ix) => tracing::info!("lance auto-index '{}': doc_id btree built in {:.2}s", meta.index_name, ix.build_time_secs),
Err(e) => failures.push(format!("{}: lance doc_id btree: {e}", meta.index_name)),
}
}
} else {
let embeddings = match embedding_cache.get_or_load(&meta.index_name).await {
Ok(arc) => arc,
Err(e) => { failures.push(format!("{}: load failed: {}", meta.index_name, e)); continue; }
};
total_vectors += embeddings.len();
let profile_default = trial::HnswConfig {
ef_construction: profile_hnsw.ef_construction,
ef_search: profile_hnsw.ef_search,
seed: profile_hnsw.seed,
};
let cfg = promotion_registry
.config_or(&meta.index_name, profile_default)
.await;
let build_t = std::time::Instant::now();
match hnsw_store
.build_index_with_config(&meta.index_name, (*embeddings).clone(), &cfg)
.await
{
Ok(_) => {
warmed.push(WarmedIndex {
index_name: meta.index_name.clone(),
source: meta.source.clone(),
vectors: embeddings.len(),
hnsw_build_secs: build_t.elapsed().as_secs_f32(),
});
}
Err(e) => failures.push(format!("{}: HNSW build failed: {}", meta.index_name, e)),
}
}
}
}
let mut model_preloaded = false;
match ai_client.preload_model(&profile_name).await {
Ok(_) => {
model_preloaded = true;
tracing::info!("profile '{}' preloaded ollama model '{}'", profile_id_clone, profile_name);
}
Err(e) => failures.push(format!("preload ollama model '{}': {e}", profile_name)),
}
{
let mut guard = active_profile.write().await;
*guard = Some(ActiveProfileSlot {
profile_id: profile_id_clone.clone(),
ollama_name: profile_name.clone(),
activated_at: chrono::Utc::now(),
});
}
let result = serde_json::to_value(ActivateReport {
profile_id: profile_id_clone,
ollama_name: profile_name,
indexes_warmed: warmed,
failures,
total_vectors,
duration_secs: t0.elapsed().as_secs_f32(),
model_preloaded,
previous_profile: previous_slot.map(|s| s.profile_id),
}).ok();
tracker.complete(&job_id, result).await;
});
// PRD Phase 41 gate: "Activate a profile → returns 202 in <100ms
// → job completes in background". 202 ACCEPTED signals async-work
// started; clients poll /vectors/jobs/{job_id} for progress.
Ok((StatusCode::ACCEPTED, Json(json!({
"job_id": job_id_for_response,
"message": format!("profile activation started — poll /vectors/jobs/{} for progress", job_id_for_response),
}))))
}
/// Unload this profile's model and clear the active slot. No-op if the
/// caller isn't the currently-active profile.
async fn deactivate_profile(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
) -> impl IntoResponse {
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
let was_active = {
let mut guard = state.active_profile.write().await;
match guard.as_ref() {
Some(s) if s.profile_id == profile_id => {
let prev = s.clone();
*guard = None;
Some(prev)
}
_ => None,
}
};
// Regardless of whether it held the slot, we can still try to unload —
// the operator's intent is "get this model out of VRAM."
let unload_result = state.ai_client.unload_model(&profile.ollama_name).await;
Ok(Json(serde_json::json!({
"profile_id": profile.id,
"ollama_name": profile.ollama_name,
"was_active": was_active.is_some(),
"unloaded": unload_result.is_ok(),
"unload_error": unload_result.err(),
})))
}
async fn get_active_profile(State(state): State<VectorState>) -> impl IntoResponse {
let slot = state.active_profile.read().await.clone();
Json(slot)
}
#[derive(Deserialize)]
struct ProfileSearchRequest {
index_name: String,
query: String,
top_k: Option<usize>,
}
/// Search scoped to a profile — refuses if the requested index's source
/// isn't in the profile's bound_datasets. Reuses the existing HNSW
/// search path when the index is warm; falls back to brute-force cosine
/// if it's not (handled by the existing search code path).
async fn profile_scoped_search(
State(state): State<VectorState>,
Path(profile_id): Path<String>,
Json(req): Json<ProfileSearchRequest>,
) -> impl IntoResponse {
let profile = match state.catalog.get_profile(&profile_id).await {
Some(p) => p,
None => return Err((StatusCode::NOT_FOUND, format!("profile not found: {profile_id}"))),
};
// Verify the index is in scope for this profile.
let index_meta = match state.index_registry.get(&req.index_name).await {
Some(m) => m,
None => return Err((StatusCode::NOT_FOUND, format!("index not found: {}", req.index_name))),
};
if !profile.bound_datasets.contains(&index_meta.source) {
return Err((
StatusCode::FORBIDDEN,
format!(
"profile '{}' is not bound to '{}' — allowed bindings: {:?}",
profile.id, index_meta.source, profile.bound_datasets,
),
));
}
let top_k = req.top_k.unwrap_or(5);
let use_lance = profile.vector_backend == shared::types::VectorBackend::Lance;
// Embed the query.
let embed_resp = state
.ai_client
.embed(EmbedRequest { texts: vec![req.query.clone()], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into()));
}
let query_vec: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
// ADR-019 hybrid: route to Lance or Parquet+HNSW based on the
// profile's declared backend. Callers don't need to know which
// storage tier they're hitting — the profile abstracts it.
if use_lance {
let lance_store = state.lance.store_for(&req.index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let t0 = std::time::Instant::now();
match lance_store.search(
&query_vec,
top_k,
Some(LANCE_DEFAULT_NPROBES),
Some(LANCE_DEFAULT_REFINE_FACTOR),
).await {
Ok(hits) => Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "lance_ivf_pq",
"latency_us": t0.elapsed().as_micros() as u64,
"results": hits,
}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
} else if state.hnsw_store.has_index(&req.index_name).await {
match state.hnsw_store.search(&req.index_name, &query_vec, top_k).await {
Ok(hits) => Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "hnsw",
"results": hits,
}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
} else {
let embeddings = state
.embedding_cache
.get_or_load(&req.index_name)
.await
.map_err(|e| (StatusCode::NOT_FOUND, format!("embeddings: {e}")))?;
let results = search::search(&query_vec, &embeddings, top_k);
Ok(Json(serde_json::json!({
"profile": profile.id,
"source": index_meta.source,
"method": "brute_force",
"results": results,
})))
}
}
// --- Phase 16: Promotion + autotune ---
#[derive(Deserialize)]
struct PromoteQuery {
#[serde(default)]
promoted_by: String,
#[serde(default)]
note: Option<String>,
}
async fn promote_trial(
State(state): State<VectorState>,
Path((index_name, trial_id)): Path<(String, String)>,
Query(q): Query<PromoteQuery>,
) -> impl IntoResponse {
// Pull the trial from the journal to get its config.
let trials = state
.trial_journal
.list(&index_name)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
let trial = trials
.iter()
.find(|t| t.id == trial_id)
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("trial not found: {trial_id}")))?;
let entry = promotion::PromotionEntry {
config: trial.config.clone(),
trial_id: trial.id.clone(),
promoted_at: chrono::Utc::now(),
promoted_by: q.promoted_by,
note: q.note,
};
match state.promotion_registry.promote(&index_name, entry).await {
Ok(file) => Ok(Json(file)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn rollback_promotion(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
match state.promotion_registry.rollback(&index_name).await {
Ok(file) => Ok(Json(file)),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
async fn get_promoted(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
match state.promotion_registry.load(&index_name).await {
Ok(file) => Ok(Json(file)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn run_autotune_endpoint(
State(state): State<VectorState>,
Json(req): Json<autotune::AutotuneRequest>,
) -> impl IntoResponse {
match autotune::run_autotune(
req,
&state.store,
&state.catalog,
&state.ai_client,
&state.embedding_cache,
&state.hnsw_store,
&state.index_registry,
&state.trial_journal,
&state.promotion_registry,
&state.harness_store,
&state.job_tracker,
).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Phase 16.2: autotune agent endpoints ---
async fn agent_status(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.agent_handle.status().await)
}
async fn agent_stop(State(state): State<VectorState>) -> impl IntoResponse {
let stopped = state.agent_handle.stop().await;
Json(serde_json::json!({ "stopped": stopped }))
}
async fn agent_enqueue(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
let event = agent::TriggerEvent::manual(index_name);
match state.agent_handle.enqueue(event).await {
Ok(()) => Ok(Json(serde_json::json!({ "enqueued": true }))),
Err(e) => Err((StatusCode::SERVICE_UNAVAILABLE, e)),
}
}
// --- ADR-019: Lance hybrid backend HTTP surface ---
//
// Lance routes operate on the same `index_name` as the Parquet/HNSW path,
// but materialize the data as a Lance dataset on disk under
// `{bucket_root}/lance/{index_name}/`. The two backends are independent:
// you can have an index in both formats simultaneously. `IndexMeta.vector_backend`
// records which one is canonical for that index.
#[derive(Deserialize)]
struct LanceMigrateRequest {
/// Optional bucket override. Defaults to whatever the existing
/// IndexMeta says, or "primary" for indexes that don't exist yet.
#[serde(default)]
bucket: Option<String>,
}
/// Read the existing Parquet vector file for `index_name` from object
/// storage, hand the bytes to vectord-lance, return migration stats.
/// The original Parquet file is left intact — both backends coexist
/// after migration.
async fn lance_migrate(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceMigrateRequest>,
) -> impl IntoResponse {
let meta = state.index_registry.get(&index_name).await
.ok_or((StatusCode::NOT_FOUND, format!("index not found: {index_name}")))?;
let bucket = req.bucket.unwrap_or(meta.bucket.clone());
// Pull the Parquet bytes via storaged::ops — same path as the
// existing embedding loader uses.
let store = state.bucket_registry.get(&bucket)
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let bytes = storaged::ops::get(&store, &meta.storage_key).await
.map_err(|e| (StatusCode::NOT_FOUND, format!("read parquet: {e}")))?;
let lance_store = state.lance.store_for_new(&index_name, &bucket).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
let stats = lance_store.migrate_from_parquet_bytes(&bytes).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
tracing::info!(
"lance migrate '{}': {} rows, {}d, {} bytes on disk, {:.2}s",
index_name, stats.rows_written, stats.dimensions,
stats.disk_bytes, stats.duration_secs,
);
// Auto-build the doc_id btree. The scalar index is what makes
// get_doc_by_id O(log n) instead of a full table scan; ADR-019
// calls this out as the load-bearing feature for hybrid lookup.
// Verified 2026-05-02: skipping this on a 10M-row dataset turns
// ~5ms doc-fetch into ~100ms (full scan over 35GB). Cheap to
// build (~1.2s on 10M, +269MB on disk) and only runs once per
// dataset since `has_scalar_index` short-circuits subsequent calls.
let scalar_stats = if !lance_store.has_scalar_index("doc_id").await.unwrap_or(false) {
match lance_store.build_scalar_index("doc_id").await {
Ok(s) => {
tracing::info!(
"lance migrate '{}': doc_id btree built in {:.2}s (+{} bytes)",
index_name, s.build_time_secs, s.disk_bytes_added,
);
Some(s)
}
Err(e) => {
// Don't fail the whole migrate over a missing btree —
// the dataset is still queryable, just slowly. Log it
// so it's debuggable.
tracing::warn!("lance migrate '{}': doc_id btree build failed (will fall back to scan): {e}", index_name);
None
}
}
} else {
None
};
Ok::<_, (StatusCode, String)>(Json(serde_json::json!({
"index_name": index_name,
"bucket": bucket,
"lance_path": lance_store.path(),
"stats": stats,
"scalar_index": scalar_stats,
})))
}
#[derive(Deserialize)]
struct LanceIndexRequest {
#[serde(default = "default_partitions")]
num_partitions: u32,
#[serde(default = "default_bits")]
num_bits: u32,
#[serde(default = "default_subvectors")]
num_sub_vectors: u32,
}
fn default_partitions() -> u32 { 316 } // ≈√100K — sane for the reference dataset
fn default_bits() -> u32 { 8 }
fn default_subvectors() -> u32 { 48 } // 768/48 = 16 dims per subvector
/// Sanitize a Lance backend error before returning it to the HTTP
/// caller. Two responsibilities:
///
/// 1. Map "dataset not found" patterns to HTTP 404 instead of 500.
/// A missing index isn't an internal failure — it's a resource
/// lookup miss, and the response code should reflect that.
/// 2. Strip server-side filesystem paths and Rust crate registry
/// paths (`/root/.cargo/registry/src/index.crates.io-...`) from
/// the message body. An attacker probing the surface shouldn't
/// learn the server's directory layout or our exact dep versions.
///
/// Surfaced 2026-05-02 by the Lance backend audit: missing-index
/// search returned 500 + leaked the lakehouse data path AND the
/// .cargo/registry path with crate versions.
fn sanitize_lance_err(err: String, index_name: &str) -> (StatusCode, String) {
// 404 detection — narrowed across two 2026-05-02→03 scrum waves.
// First wave (opus WARN service.rs:1908): the original `lower.contains
// ("not found")` was too broad — caught "column not found" /
// "field not found in schema" which are real 500s. Second wave (opus
// WARN service.rs:1949): the looser `mentions_path_missing` branch I
// added would 404 on a registry-file error like "/root/.cargo/.../x.rs:
// no such file or directory" because it triggers without dataset
// context. Drop the standalone path-missing branch; require dataset
// context AND a missing-shape phrase. Lance's actual error format
// ("Dataset at path X was not found") satisfies this.
let lower = err.to_lowercase();
let mentions_dataset = lower.contains("dataset");
let lance_dataset_missing = mentions_dataset && (
lower.contains("not found") || lower.contains("does not exist")
);
// Excluded shapes — these contain "not found" but are real 500s.
let column_or_field = lower.contains("column not found")
|| lower.contains("field not found")
|| lower.contains("schema not found");
let is_not_found = lance_dataset_missing && !column_or_field;
if is_not_found {
return (StatusCode::NOT_FOUND, format!("lance dataset not found: {index_name}"));
}
// Path redaction — replace path-shaped substrings with [REDACTED]
// rather than truncating, per opus BLOCK at service.rs:1914 from the
// 2026-05-02 scrum. The previous `err.split("/home/").next()` returned
// Some("") when the error string STARTED with "/home/", erasing the
// entire message and falling back to a generic "lance backend error"
// that lost all real error context. Replacing keeps the structural
// error (the "what failed") while stripping the location.
let cleaned = redact_paths(&err)
.trim_end_matches([',', ' ', '\n', '\t'])
.to_string();
let msg = if cleaned.is_empty() {
format!("lance backend error on {index_name}")
} else {
cleaned
};
(StatusCode::INTERNAL_SERVER_ERROR, msg)
}
/// Replace absolute-path substrings (under known leak-prone roots) with
/// "[REDACTED]". Walks the input once, identifying path-shaped runs that
/// start with one of the configured prefixes and continue until a
/// path-terminating character (whitespace, quote, comma, paren, EOL).
///
/// Linear time, no regex dep. Catches multi-occurrence cases that
/// `String::split(p).next()` lost. The path-redaction surface intentionally
/// includes /var, /tmp, /etc, /usr, /opt in addition to /home and
/// /root/.cargo because Lance/Arrow errors surface system paths in
/// addition to project paths.
fn redact_paths(s: &str) -> String {
// Two prefix sets:
// - ABSOLUTE: paths starting with '/' (always safe to redact)
// - RELATIVE: same path bodies but without leading '/' (Lance occasionally
// strips the leading slash when echoing dataset paths back, observed
// live 2026-05-02 — "Dataset at path home/profit/lakehouse/data/lance/x
// was not found"). Match these only when preceded by a non-alpha char
// (start of string, space, colon, etc.) so we don't redact innocent
// tokens like "homecoming" or "etcetera".
const ABSOLUTE: &[&str] = &[
"/root/.cargo", "/home", "/var", "/tmp", "/etc", "/usr", "/opt",
];
const RELATIVE: &[&str] = &[
"root/.cargo", "home/", "var/", "tmp/", "etc/", "usr/", "opt/",
];
fn is_path_term(b: u8) -> bool {
matches!(b, b' ' | b'\t' | b'\n' | b'\r' | b'"' | b'\'' | b',' | b')' | b']' | b'}')
}
fn is_word_boundary_before(bytes: &[u8], i: usize) -> bool {
// True if byte at i-1 is non-alphanumeric (so this position starts
// a fresh token). True at start-of-input.
if i == 0 { return true; }
let b = bytes[i - 1];
!(b.is_ascii_alphanumeric() || b == b'_' || b == b'.' || b == b'-')
}
// Walk by byte index but slice the original &str when emitting, never
// cast bytes to char (that would corrupt multi-byte UTF-8 — opus WARN
// at service.rs:2018 from the 2026-05-03 re-scrum). Path prefixes are
// pure ASCII so byte-level matching is sound; what matters is that
// we emit non-matched stretches as &str slices, not byte-by-byte.
let bytes = s.as_bytes();
let mut out = String::with_capacity(s.len());
let mut i = 0;
let mut copy_start = 0usize; // start of an in-progress unmatched run
while i < bytes.len() {
let mut matched_len: Option<usize> = None;
// Try absolute prefixes first (always allowed).
for p in ABSOLUTE {
let pb = p.as_bytes();
if i + pb.len() <= bytes.len() && &bytes[i..i + pb.len()] == pb {
let after = i + pb.len();
if after == bytes.len() || bytes[after] == b'/' || is_path_term(bytes[after]) {
matched_len = Some(pb.len());
break;
}
}
}
// Then relative prefixes — only at word boundaries.
if matched_len.is_none() && is_word_boundary_before(bytes, i) {
for p in RELATIVE {
let pb = p.as_bytes();
if i + pb.len() <= bytes.len() && &bytes[i..i + pb.len()] == pb {
matched_len = Some(pb.len());
break;
}
}
}
if let Some(prefix_len) = matched_len {
// Flush any pending unmatched run as a UTF-8-safe slice.
if copy_start < i {
out.push_str(&s[copy_start..i]);
}
out.push_str("[REDACTED]");
// Skip past the prefix and the path body (until terminator).
let mut j = i + prefix_len;
while j < bytes.len() && !is_path_term(bytes[j]) {
j += 1;
}
i = j;
copy_start = i;
} else {
// Advance one CHAR (not one byte) so multi-byte UTF-8 sequences
// stay intact in the eventual slice. Look up the next char
// boundary using the public API.
i += utf8_char_len(bytes, i);
}
}
if copy_start < bytes.len() {
out.push_str(&s[copy_start..]);
}
out
}
/// Length in bytes of the UTF-8 character starting at byte `i`. Bytes are
/// guaranteed to be a valid UTF-8 sequence start (callers ensure that).
fn utf8_char_len(bytes: &[u8], i: usize) -> usize {
let b = bytes[i];
if b < 0x80 { 1 }
else if b < 0xC0 { 1 } // continuation byte — defensive, shouldn't start here
else if b < 0xE0 { 2 }
else if b < 0xF0 { 3 }
else { 4 }
}
#[cfg(test)]
mod sanitize_tests {
use super::*;
#[test]
fn redact_path_at_offset_zero() {
// Regression: opus BLOCK 2026-05-02. Old impl returned Some("")
// when err started with "/home/", erasing the whole message.
let out = redact_paths("/home/profit/lakehouse/data/lance not a directory");
assert_eq!(out, "[REDACTED] not a directory");
}
#[test]
fn redact_keeps_pre_and_post_text() {
let out = redact_paths("failed to open /home/profit/lakehouse/data/x for read: ENOENT");
assert_eq!(out, "failed to open [REDACTED] for read: ENOENT");
}
#[test]
fn redact_multiple_paths() {
let out = redact_paths("at /root/.cargo/registry/src/index.crates.io-foo/lance-table-4.0.0/src/io/commit.rs:364:26 from /home/profit/lakehouse");
assert!(!out.contains("/root/.cargo"));
assert!(!out.contains("/home/"));
assert!(out.contains("[REDACTED]"));
}
#[test]
fn redact_preserves_quote_terminator() {
let out = redact_paths("{\"path\":\"/home/profit/x\",\"err\":\"bad\"}");
assert_eq!(out, "{\"path\":\"[REDACTED]\",\"err\":\"bad\"}");
}
#[test]
fn is_not_found_narrow_dataset_only() {
// Regression: opus WARN 2026-05-02. Old impl 404'd on any "not
// found" — including legitimate column/field-not-found 500s.
let (status, _) = sanitize_lance_err(
"column not found: vector".into(), "test_idx",
);
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
let (status, _) = sanitize_lance_err(
"dataset not found at /home/profit/lakehouse/data/lance/missing".into(), "test_idx",
);
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[test]
fn redact_does_not_match_prefix_substring() {
// /etcetera should NOT trigger /etc redaction.
let out = redact_paths("etcetera and /etcd");
assert_eq!(out, "etcetera and /etcd");
}
#[test]
fn redact_relative_paths_lance_emits() {
// 2026-05-02: live missing-index probe surfaced Lance error of the
// form "Dataset at path home/profit/lakehouse/data/lance/x was not
// found" — leading slash stripped. Need to redact the relative form
// when preceded by a word boundary.
let out = redact_paths("Dataset at path home/profit/lakehouse/data/lance/x was not found");
assert!(!out.contains("home/profit"), "should redact: {out}");
assert!(out.contains("Dataset at path"));
assert!(out.contains("was not found"));
}
#[test]
fn redact_does_not_eat_innocent_prefix_words() {
// "homecoming" must NOT trigger "home/" redaction. "Etcetera" must
// NOT trigger "etc/" redaction. The word-boundary guard handles this.
let out = redact_paths("homecoming etcetera vary tmpfile");
assert_eq!(out, "homecoming etcetera vary tmpfile");
}
#[test]
fn is_not_found_lance_actual_phrasing() {
// Lance's actual error format observed live: "Dataset at path X was
// not found: Not found: ...". Must 404, not 500.
let (status, _) = sanitize_lance_err(
"Dataset at path home/profit/lakehouse/data/lance/x was not found".into(),
"x",
);
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[test]
fn is_not_found_excludes_column_field_schema() {
// Real 500s with the "not found" phrase that aren't dataset-missing.
for err in [
"column not found: vector",
"field not found in schema: doc_id",
"schema not found for dataset xyz",
] {
let (status, _) = sanitize_lance_err(err.into(), "test_idx");
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR, "{err}");
}
}
#[test]
fn is_not_found_does_not_match_unrelated_path_missing() {
// Regression: opus WARN at service.rs:1949 from the 2026-05-03
// re-scrum. A registry-file error from inside a Lance internal
// module should NOT be coerced to 404 just because it contains
// "no such file or directory" — it's a real 500.
let (status, _) = sanitize_lance_err(
"/root/.cargo/registry/src/index.crates.io-foo/lance-table-4.0.0/src/io/commit.rs: no such file or directory".into(),
"test_idx",
);
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
// (And the path is still redacted in the message.)
let (_, msg) = sanitize_lance_err(
"/root/.cargo/registry/src/lance-foo/x.rs: no such file or directory".into(),
"test_idx",
);
assert!(!msg.contains("/root/.cargo"), "path leak: {msg}");
}
#[test]
fn redact_preserves_multibyte_utf8() {
// Regression: opus WARN at service.rs:2018 from the 2026-05-03
// re-scrum. Old impl did `out.push(bytes[i] as char)` which
// corrupted multi-byte UTF-8 (e.g. a path containing user-supplied
// names with non-ASCII characters) into Latin-1 mojibake.
let input = "Failed to open /home/profit/工作/data — café not found";
let out = redact_paths(input);
// The path is redacted...
assert!(!out.contains("/home/profit"), "path leak: {out}");
// ...AND the multi-byte characters elsewhere are preserved verbatim.
assert!(out.contains("café"), "lost UTF-8: {out}");
assert!(out.contains("not found"), "lost trailing context: {out}");
}
}
/// Build the IVF_PQ index on the Lance dataset.
async fn lance_build_index(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceIndexRequest>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
match lance_store.build_index(req.num_partitions, req.num_bits, req.num_sub_vectors).await {
Ok(stats) => Ok(Json(stats)),
Err(e) => Err(sanitize_lance_err(e, &index_name)),
}
}
#[derive(Deserialize)]
struct LanceSearchRequest {
/// Plain text query — embedded server-side for symmetry with the
/// existing /vectors/search path.
query: String,
#[serde(default = "default_top_k")]
top_k: usize,
/// IVF partitions to probe. `None` uses Lance's built-in default of
/// 1, which caps recall well below the index's real capability.
/// Recommended: 510% of num_partitions (≈20 for a 316-partition
/// index). Omitting it here picks the server-side default.
#[serde(default)]
nprobes: Option<usize>,
/// Refine factor — re-rank `top_k * factor` PQ-approximate candidates
/// with exact distances before returning `top_k`. Recovers recall
/// lost to product quantization.
#[serde(default)]
refine_factor: Option<u32>,
}
/// Server-side defaults when the caller doesn't pin nprobes / refine
/// themselves. Tuned for the ~100K × 768d reference workload; see
/// docs/ADR-019-vector-storage.md for the recall / latency trade-off.
const LANCE_DEFAULT_NPROBES: usize = 20;
const LANCE_DEFAULT_REFINE_FACTOR: u32 = 5;
fn default_top_k() -> usize { 5 }
/// Vector search against a Lance dataset. Embeds the query text via the
/// sidecar then calls Lance's nearest-neighbor scanner.
async fn lance_search(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceSearchRequest>,
) -> impl IntoResponse {
let embed_resp = state.ai_client
.embed(EmbedRequest { texts: vec![req.query.clone()], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed: {e}")))?;
if embed_resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "no embedding returned".into()));
}
let qv: Vec<f32> = embed_resp.embeddings[0].iter().map(|&x| x as f32).collect();
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
let t0 = std::time::Instant::now();
let nprobes = req.nprobes.or(Some(LANCE_DEFAULT_NPROBES));
let refine = req.refine_factor.or(Some(LANCE_DEFAULT_REFINE_FACTOR));
let hits = lance_store.search(&qv, req.top_k, nprobes, refine).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
Ok(Json(serde_json::json!({
"index_name": index_name,
"query": req.query,
"method": "lance_ivf_pq",
"latency_us": t0.elapsed().as_micros() as u64,
"results": hits,
})))
}
/// Random-access fetch by doc_id — the O(1) lookup that's basically
/// impossible in our Parquet path without scanning the whole file.
async fn lance_get_doc(
State(state): State<VectorState>,
Path((index_name, doc_id)): Path<(String, String)>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
let t0 = std::time::Instant::now();
match lance_store.get_by_doc_id(&doc_id).await {
Ok(Some(row)) => Ok(Json(serde_json::json!({
"index_name": index_name,
"doc_id": doc_id,
"latency_us": t0.elapsed().as_micros() as u64,
"row": row,
}))),
Ok(None) => Err((StatusCode::NOT_FOUND, format!("doc_id not found: {doc_id}"))),
Err(e) => Err(sanitize_lance_err(e, &index_name)),
}
}
#[derive(Deserialize)]
struct LanceAppendRequest {
/// Optional source tag — set on every appended row.
#[serde(default)]
source: Option<String>,
rows: Vec<LanceAppendRow>,
}
#[derive(Deserialize)]
struct LanceAppendRow {
doc_id: String,
#[serde(default)]
chunk_idx: Option<i32>,
chunk_text: String,
/// Pre-computed embedding. Caller is responsible for ensuring it
/// matches the dataset's dimensions and embedding model.
vector: Vec<f32>,
}
async fn lance_append(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceAppendRequest>,
) -> impl IntoResponse {
if req.rows.is_empty() {
return Err((StatusCode::BAD_REQUEST, "rows array is empty".into()));
}
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| sanitize_lance_err(e, &index_name))?;
let mut doc_ids = Vec::with_capacity(req.rows.len());
let mut chunk_idxs = Vec::with_capacity(req.rows.len());
let mut chunk_texts = Vec::with_capacity(req.rows.len());
let mut vectors = Vec::with_capacity(req.rows.len());
for r in req.rows {
doc_ids.push(r.doc_id);
chunk_idxs.push(r.chunk_idx.unwrap_or(0));
chunk_texts.push(r.chunk_text);
vectors.push(r.vector);
}
match lance_store.append(req.source, doc_ids, chunk_idxs, chunk_texts, vectors).await {
Ok(stats) => Ok(Json(stats)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn lance_stats(
State(state): State<VectorState>,
Path(index_name): Path<String>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
match lance_store.stats().await {
Ok(s) => Ok(Json(s)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
/// Run an existing harness against Lance IVF_PQ and measure recall@k.
/// Uses the same ground truth computed by brute-force cosine (the HNSW
/// eval path). This closes ADR-019's explicit gap: "IVF_PQ recall not
/// measured."
#[derive(Deserialize)]
struct LanceRecallRequest {
harness: String,
#[serde(default = "default_top_k")]
top_k: usize,
/// Override server defaults so operators can sweep nprobes /
/// refine_factor to chart the recall-vs-latency curve.
#[serde(default)]
nprobes: Option<usize>,
#[serde(default)]
refine_factor: Option<u32>,
}
#[derive(serde::Serialize)]
struct LanceRecallResult {
index_name: String,
harness: String,
queries: usize,
top_k: usize,
mean_recall: f32,
per_query: Vec<LanceRecallQuery>,
latency_p50_us: f32,
latency_p95_us: f32,
total_duration_secs: f32,
}
#[derive(serde::Serialize)]
struct LanceRecallQuery {
query_id: String,
recall: f32,
latency_us: f32,
hits_returned: usize,
}
// --- Phase 19: playbook memory endpoints ---
/// Extract (name, city, state) from a chunk formatted like
/// "{Name} — {Role} in {City}, {State}. Skills: …".
/// Returns None if the chunk doesn't match the shape; callers simply
/// skip the boost for that hit.
/// Extract role from an SQL filter matching `role = 'Welder'` style.
/// Case-insensitive on the column name. Quoted value; quotes not
/// included in returned string.
fn extract_target_role(sql_filter: &str) -> Option<String> {
grab_eq_value(sql_filter, "role")
}
/// Shared equality-value extractor for (city, state, role) lookups.
fn grab_eq_value(src: &str, col: &str) -> Option<String> {
let lower = src.to_ascii_lowercase();
let col_lower = col.to_ascii_lowercase();
let mut search_from = 0usize;
while let Some(off) = lower[search_from..].find(&col_lower) {
let pos = search_from + off;
let prior_ok = pos == 0
|| !lower.as_bytes()[pos - 1].is_ascii_alphanumeric()
&& lower.as_bytes()[pos - 1] != b'_';
let after = pos + col_lower.len();
if !prior_ok || after >= src.len() {
search_from = pos + col_lower.len();
continue;
}
let mut i = after;
while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; }
if i >= src.len() || src.as_bytes()[i] != b'=' { search_from = pos + col_lower.len(); continue; }
i += 1;
while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; }
if i >= src.len() || src.as_bytes()[i] != b'\'' { search_from = pos + col_lower.len(); continue; }
i += 1;
let start = i;
while i < src.len() && src.as_bytes()[i] != b'\'' { i += 1; }
if i > start {
return Some(src[start..i].to_string());
}
search_from = pos + col_lower.len();
}
None
}
/// Pull (city, state) out of a SQL filter that uses
/// `city = 'Toledo' AND state = 'OH'` style equality. Returns None if
/// either is missing — the caller keeps the original global boost map
/// behavior (no geo narrowing). Case-insensitive on the column name
/// so `CITY=` or `City =` also work.
fn extract_target_geo(sql_filter: &str) -> Option<(String, String)> {
let city = grab_eq_value(sql_filter, "city")?;
let state = grab_eq_value(sql_filter, "state")?;
Some((city, state))
}
fn parse_worker_chunk(chunk: &str) -> Option<(String, String, String)> {
// "Name — Role in City, ST. …" → split on "—" then " in " then ","
let (name_part, rest) = chunk.split_once('—')?;
let rest = rest.trim();
let (_role, loc_part) = rest.split_once(" in ")?;
let loc_part = loc_part.trim();
let (city, state_plus) = loc_part.split_once(',')?;
let state: String = state_plus.trim()
.chars()
.take_while(|c| c.is_ascii_alphabetic())
.collect();
let name = name_part.trim().to_string();
let city = city.trim().to_string();
if name.is_empty() || city.is_empty() || state.is_empty() {
return None;
}
Some((name, city, state))
}
#[derive(Deserialize)]
struct SeedPlaybookRequest {
/// One playbook with {operation, approach, context, endorsed_names}.
/// City + state are parsed from the operation text.
operation: String,
#[serde(default)]
approach: String,
#[serde(default)]
context: String,
endorsed_names: Vec<String>,
/// Append to the existing memory rather than replacing. Default true —
/// seeding is a bootstrap/demo tool, not a rebuild substitute.
#[serde(default = "default_true")]
append: bool,
/// Phase 25 — optional schema_fingerprint captured at seed time.
/// When the underlying dataset's schema changes, any entry whose
/// fingerprint doesn't match the new one is auto-retired via
/// retire_on_schema_drift. Caller-provided so the producer (the
/// scenario driver, the orchestrator) can pass the live fingerprint
/// without the gateway needing a second catalogd round trip.
#[serde(default)]
schema_fingerprint: Option<String>,
/// Phase 25 — optional hard expiry. RFC3339 timestamp. After this
/// moment the entry is skipped during boost computation (not
/// retired, just inactive). Useful for seasonal/temp contracts.
#[serde(default)]
valid_until: Option<String>,
/// Phase 45 — optional external doc references captured at seal
/// time. Each entry names a tool + version_seen; context7-driven
/// drift check compares against current versions later. None or
/// empty = no drift signal (never flagged).
#[serde(default)]
doc_refs: Option<Vec<playbook_memory::DocRef>>,
}
/// Bootstrap / test-only: inject a playbook entry directly into
/// `playbook_memory` without going through `successful_playbooks`. Useful
/// when the source dataset has stale or phantom entries (as the initial
/// staffing seed did — names that don't correspond to real workers), and
/// you want to demonstrate the feedback loop with a known-good fixture.
///
/// Production path is always `/rebuild` — this endpoint is for operators
/// who need to prime the memory before real playbooks accumulate.
async fn seed_playbook_memory(
State(state): State<VectorState>,
Json(req): Json<SeedPlaybookRequest>,
) -> impl IntoResponse {
// Serialize embed calls to avoid concurrent socket collisions.
let _permit = state.embed_semaphore.acquire().await.map_err(|e|
(StatusCode::INTERNAL_SERVER_ERROR, format!("semaphore error: {e}")))?;
// Embed the entry through the same text shape `rebuild` uses so
// similarity math is comparable across seed + real entries.
let tmp_entry = playbook_memory::PlaybookEntry {
operation: req.operation.clone(),
approach: req.approach.clone(),
context: req.context.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: req.endorsed_names.clone(),
..Default::default()
};
let text = format!(
"{} | {} | {} | fills: {}",
tmp_entry.operation, tmp_entry.approach, tmp_entry.context,
tmp_entry.endorsed_names.join(", "),
);
let resp = match state.ai_client.embed(EmbedRequest { texts: vec![text], model: None }).await {
Ok(r) => r,
Err(e) => return Err((StatusCode::BAD_GATEWAY, format!("embed seed: {e}"))),
};
if resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "embed returned nothing".into()));
}
let emb: Vec<f32> = resp.embeddings[0].iter().map(|&x| x as f32).collect();
// Parse city/state from the operation ("fill: Role xN in City, ST").
// Parser lives in playbook_memory::rebuild — expose via a tiny helper
// or inline the same logic here; duplicated briefly since this seed
// path is stable but infrequently called.
let (city, state_) = {
let after_in = req.operation.split(" in ").nth(1).unwrap_or("");
let mut parts = after_in.splitn(2, ',');
let city = parts.next().map(|s| s.trim().to_string()).filter(|s| !s.is_empty());
let state = parts.next().map(|s| s.trim().chars().take_while(|c| c.is_ascii_alphabetic()).collect::<String>()).filter(|s| !s.is_empty());
(city, state)
};
if city.is_none() || state_.is_none() {
return Err((StatusCode::BAD_REQUEST,
"operation must match 'fill: Role xN in City, ST' shape".into()));
}
// Stable id: hash of timestamp + operation. Callers get the id back
// so they can reference it in citations.
let ts = chrono::Utc::now().to_rfc3339();
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(ts.as_bytes());
h.update(b"|");
h.update(req.operation.as_bytes());
let bytes = h.finalize();
let pid = format!("pb-seed-{}", bytes.iter().take(8).map(|b| format!("{b:02x}")).collect::<String>());
let new_entry = playbook_memory::PlaybookEntry {
playbook_id: pid.clone(),
operation: req.operation,
approach: req.approach,
context: req.context,
timestamp: ts,
endorsed_names: req.endorsed_names,
city, state: state_,
embedding: Some(emb),
// Phase 25 — seed request may carry a fingerprint; if not, we
// default to None and the entry degrades to "no expiry signal"
// (never auto-retired on drift, but manual retirement still
// works). valid_until + retired_at start None.
schema_fingerprint: req.schema_fingerprint.clone(),
valid_until: req.valid_until.clone(),
// Phase 45 — seed request may also carry doc_refs; defaults
// empty so pre-Phase-45 callers still work and the entry
// degrades to "no drift signal" (never flagged).
doc_refs: req.doc_refs.clone().unwrap_or_default(),
..Default::default()
};
// Phase 26 — when append=true (default), route through upsert so
// same-day re-seeds of the same operation merge instead of
// appending duplicates. When append=false, retain the old
// replace-all semantics for callers that want a hard reset.
if req.append {
match state.playbook_memory.upsert_entry(new_entry).await {
Ok(outcome) => {
let entries_after = state.playbook_memory.entry_count().await;
Ok(Json(serde_json::json!({
"outcome": outcome,
"entries_after": entries_after,
})))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("upsert: {e}"))),
}
} else {
if let Err(e) = state.playbook_memory.set_entries(vec![new_entry]).await {
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("persist: {e}")));
}
Ok(Json(serde_json::json!({
"outcome": { "mode": "replaced", "playbook_id": pid },
"entries_after": state.playbook_memory.entry_count().await,
})))
}
}
async fn rebuild_playbook_memory(
State(state): State<VectorState>,
) -> impl IntoResponse {
match playbook_memory::rebuild(
&state.playbook_memory,
&state.ai_client,
&state.catalog,
&state.bucket_registry,
).await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// Path 2 foundation — dump in-memory playbook_memory state to a fresh
// `successful_playbooks_live` dataset. Cheap to call (writes one parquet,
// updates one manifest), so /log can call it after every seed to keep the
// SQL-queryable surface honest without the destructive REPLACE bug that
// /ingest/file has.
async fn persist_playbook_memory_sql(
State(state): State<VectorState>,
) -> impl IntoResponse {
match playbook_memory::persist_to_sql(&state.playbook_memory, &state.catalog).await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PatternsRequest {
query: String,
#[serde(default = "default_pattern_k")]
top_k_playbooks: usize,
/// Minimum frequency (0.0-1.0) for a trait to make the report.
/// Default 0.4 — at least 40% of examined workers must share it.
#[serde(default = "default_pattern_min_freq")]
min_trait_frequency: f32,
}
fn default_pattern_k() -> usize { 10 }
fn default_pattern_min_freq() -> f32 { 0.4 }
// Path 2 — meta-index discovery surface. "What did past similar fills
// have in common that I didn't ask about?" — surfaces signals like
// recurring certifications, skill clusters, archetype tendencies.
async fn discover_playbook_patterns(
State(state): State<VectorState>,
Json(req): Json<PatternsRequest>,
) -> impl IntoResponse {
match playbook_memory::discover_patterns(
&state.playbook_memory,
&state.ai_client,
&state.catalog,
&state.bucket_registry,
&req.query,
req.top_k_playbooks,
req.min_trait_frequency,
).await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct MarkFailedRequest {
/// Operation text, same shape as seed: "fill: Role xN in City, ST"
operation: String,
/// Names of workers who didn't deliver on the fill.
failed_names: Vec<String>,
/// Short reason (no-show, fired, unreliable). Stored verbatim.
#[serde(default)]
reason: String,
}
async fn mark_playbook_failed(
State(state): State<VectorState>,
Json(req): Json<MarkFailedRequest>,
) -> impl IntoResponse {
// Parse city + state from the operation — mirrors seed's parser.
let after_in = req.operation.split(" in ").nth(1).unwrap_or("");
let mut parts = after_in.splitn(2, ',');
let city = parts.next().map(|s| s.trim().to_string()).filter(|s| !s.is_empty());
let state_ = parts.next().map(|s|
s.trim().chars().take_while(|c| c.is_ascii_alphabetic()).collect::<String>()
).filter(|s| !s.is_empty());
let (Some(city), Some(state_code)) = (city, state_) else {
return Err((StatusCode::BAD_REQUEST,
"operation must match 'fill: Role xN in City, ST' shape".into()));
};
let ts = chrono::Utc::now().to_rfc3339();
let records: Vec<playbook_memory::FailureRecord> = req.failed_names.iter()
.map(|n| playbook_memory::FailureRecord {
city: city.clone(), state: state_code.clone(), name: n.clone(),
reason: req.reason.clone(), timestamp: ts.clone(),
})
.collect();
match state.playbook_memory.mark_failures(records).await {
Ok(added) => Ok(Json(serde_json::json!({ "added": added, "city": city, "state": state_code }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn playbook_memory_stats(
State(state): State<VectorState>,
) -> impl IntoResponse {
let entries = state.playbook_memory.snapshot().await;
Json(serde_json::json!({
"entries": entries.len(),
"total_names_endorsed": entries.iter().map(|e| e.endorsed_names.len()).sum::<usize>(),
"entries_with_embeddings": entries.iter().filter(|e| e.embedding.is_some()).count(),
"sample": entries.iter().take(3).map(|e| serde_json::json!({
"id": e.playbook_id,
"operation": e.operation,
"city": e.city,
"state": e.state,
"endorsed": e.endorsed_names,
})).collect::<Vec<_>>(),
}))
}
#[derive(Deserialize)]
struct RetirePlaybookRequest {
/// Retire by playbook_id — exact match, single entry. Used for
/// manual operator retirement via the UI.
#[serde(default)]
playbook_id: Option<String>,
/// Retire by scope — city + state required, with a fingerprint
/// that entries must match to survive. Fingerprint mismatch → retire.
/// Use when a schema migration produces a new fingerprint and
/// historical playbooks need to be auto-retired.
#[serde(default)]
city: Option<String>,
#[serde(default)]
state: Option<String>,
#[serde(default)]
current_schema_fingerprint: Option<String>,
/// Human-readable reason stored on the retired entry.
reason: String,
}
/// Phase 25 retirement endpoint. Two modes:
/// {playbook_id, reason} → retire one
/// {city, state, current_schema_fingerprint, reason} → retire all
/// entries in scope whose
/// fingerprint differs
async fn retire_playbook_memory(
State(state): State<VectorState>,
Json(req): Json<RetirePlaybookRequest>,
) -> impl IntoResponse {
if let Some(id) = &req.playbook_id {
return match state.playbook_memory.retire_one(id, &req.reason).await {
Ok(found) => Ok(Json(serde_json::json!({ "mode": "by_id", "retired": if found { 1 } else { 0 } }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
};
}
if let (Some(city), Some(state_code), Some(fp)) = (&req.city, &req.state, &req.current_schema_fingerprint) {
return match state.playbook_memory.retire_on_schema_drift(city, state_code, fp, &req.reason).await {
Ok(n) => Ok(Json(serde_json::json!({ "mode": "schema_drift", "retired": n, "city": city, "state": state_code }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
};
}
Err((StatusCode::BAD_REQUEST,
"supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into()))
}
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/check/{id}
///
/// Iterates the playbook's `doc_refs`, asks the context7 bridge whether
/// each one drifted against the recorded snippet_hash. If any tool
/// returned `drifted: true`, stamps `doc_drift_flagged_at` on the
/// entry — which excludes it from boost (via the filter in
/// `compute_boost_for_filtered_with_role`) until a human reviews and
/// resolves.
///
/// Unknown outcomes (bridge down, tool not in context7, no snippet
/// hash) are explicitly NOT enough to flag. Only a positive drifted=true
/// from the bridge flips the flag.
async fn check_doc_drift(
State(state): State<VectorState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
let entry = state.playbook_memory.get_entry(&id).await
.ok_or((StatusCode::NOT_FOUND, format!("playbook not found: {id}")))?;
if entry.doc_refs.is_empty() {
return Ok(Json(serde_json::json!({
"playbook_id": id,
"checked_tools": [],
"drifted": false,
"flagged": false,
"reason": "entry has no doc_refs — nothing to check",
})));
}
let cfg = DriftCheckerConfig::default();
let results = check_all_refs(&cfg, &entry.doc_refs).await;
let per_tool: Vec<serde_json::Value> = results.iter().map(|r| {
let (drifted, current, src, reason) = match &r.outcome {
DriftOutcome::Drifted { current_snippet_hash, source_url } =>
(true, Some(current_snippet_hash.clone()), source_url.clone(), None),
DriftOutcome::Unchanged =>
(false, None, None, None),
DriftOutcome::Unknown { reason } =>
(false, None, None, Some(reason.clone())),
};
serde_json::json!({
"tool": r.tool,
"version_seen": r.version_seen,
"drifted": drifted,
"current_snippet_hash": current,
"source_url": src,
"unknown_reason": reason,
})
}).collect();
let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }));
let flagged = if any_drifted {
state.playbook_memory.flag_doc_drift(&id).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("flag: {e}")))?
} else {
false
};
Ok(Json(serde_json::json!({
"playbook_id": id,
"checked_tools": results.iter().map(|r| &r.tool).collect::<Vec<_>>(),
"drifted": any_drifted,
"flagged": flagged,
"per_tool": per_tool,
})))
}
/// Phase 45 closure (2026-04-27) — POST /playbook_memory/doc_drift/scan
///
/// Iterates all active playbooks (non-retired, has doc_refs), runs
/// drift check against context7 for each, flags drifted entries via
/// PlaybookMemory::flag_doc_drift, and appends a row to
/// data/_kb/doc_drift_corrections.jsonl for each drift detected.
///
/// Returns aggregate stats so an operator can see at-a-glance how
/// many playbooks drifted and which tools moved.
///
/// Honors entries already flagged: they're counted in `already_flagged`
/// (no double-flag, no duplicate corrections.jsonl row).
async fn scan_doc_drift(
State(state): State<VectorState>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
let entries = state.playbook_memory.snapshot().await;
let now = chrono::Utc::now().to_rfc3339();
let cfg = DriftCheckerConfig::default();
let mut scanned = 0usize;
let mut newly_flagged = 0usize;
let mut already_flagged = 0usize;
let mut skipped_no_refs = 0usize;
let mut skipped_retired = 0usize;
let mut tool_counts: std::collections::HashMap<String, usize> = Default::default();
let mut corrections_rows: Vec<String> = vec![];
for e in entries.iter() {
if e.retired_at.is_some() { skipped_retired += 1; continue; }
if e.doc_refs.is_empty() { skipped_no_refs += 1; continue; }
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
already_flagged += 1;
continue;
}
scanned += 1;
let results = check_all_refs(&cfg, &e.doc_refs).await;
let drifted_tools: Vec<&str> = results.iter()
.filter(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }))
.map(|r| r.tool.as_str())
.collect();
if drifted_tools.is_empty() { continue; }
// Flag the entry.
let flagged = state.playbook_memory.flag_doc_drift(&e.playbook_id).await
.unwrap_or(false);
if flagged { newly_flagged += 1; }
for t in &drifted_tools {
*tool_counts.entry(t.to_string()).or_insert(0) += 1;
}
// Build corrections.jsonl row — one per drifted playbook with
// the tool list inline. Downstream consumers (overview model,
// operator dashboard) read this to decide reviews + revisions.
let row = serde_json::json!({
"playbook_id": e.playbook_id,
"scanned_at": now,
"drifted_tools": drifted_tools,
"per_tool": results.iter().map(|r| {
let (drifted, current, src) = match &r.outcome {
DriftOutcome::Drifted { current_snippet_hash, source_url } =>
(true, Some(current_snippet_hash.clone()), source_url.clone()),
_ => (false, None, None),
};
serde_json::json!({
"tool": r.tool, "version_seen": r.version_seen,
"drifted": drifted, "current_snippet_hash": current, "source_url": src,
})
}).collect::<Vec<_>>(),
"recommended_action": "review-and-resolve",
});
corrections_rows.push(row.to_string());
}
// Persist corrections.jsonl row(s) for the operator/overview model.
if !corrections_rows.is_empty() {
let path = std::path::PathBuf::from("/home/profit/lakehouse/data/_kb/doc_drift_corrections.jsonl");
if let Some(parent) = path.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
tracing::warn!(target: "vectord.doc_drift", "create_dir_all {parent:?}: {e}");
}
}
let body = corrections_rows.join("\n") + "\n";
if let Err(e) = tokio::fs::OpenOptions::new()
.create(true).append(true).open(&path).await
{
tracing::warn!(target: "vectord.doc_drift", "open {path:?}: {e}");
} else {
use tokio::io::AsyncWriteExt;
match tokio::fs::OpenOptions::new().create(true).append(true).open(&path).await {
Ok(mut f) => {
if let Err(e) = f.write_all(body.as_bytes()).await {
tracing::warn!(target: "vectord.doc_drift", "append {path:?}: {e}");
}
}
Err(e) => tracing::warn!(target: "vectord.doc_drift", "reopen {path:?}: {e}"),
}
}
}
Ok(Json(serde_json::json!({
"scanned_at": now,
"scanned": scanned,
"newly_flagged": newly_flagged,
"already_flagged": already_flagged,
"skipped_retired": skipped_retired,
"skipped_no_refs": skipped_no_refs,
"drifted_by_tool": tool_counts,
"corrections_written": corrections_rows.len(),
})))
}
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id}
///
/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`.
/// Idempotent: returns `resolved: false` if nothing changed (entry
/// wasn't flagged, already reviewed, or doesn't exist).
async fn resolve_doc_drift(
State(state): State<VectorState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let resolved = state.playbook_memory.resolve_doc_drift(&id).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("resolve: {e}")))?;
Ok(Json(serde_json::json!({
"playbook_id": id,
"resolved": resolved,
})))
}
/// Phase 27 — request body for `POST /playbook_memory/revise`. Same
/// shape as a seed request minus `append` (revise is always
/// append-semantics for a specific parent) plus `parent_id`. The new
/// version's `playbook_id` is derived deterministically so callers get
/// the same id back from repeated revises with identical content —
/// useful for idempotent retry paths.
#[derive(Deserialize)]
struct RevisePlaybookRequest {
parent_id: String,
operation: String,
approach: String,
context: String,
endorsed_names: Vec<String>,
#[serde(default)]
schema_fingerprint: Option<String>,
#[serde(default)]
valid_until: Option<String>,
/// Phase 45 — updated doc references. Typically a revise happens
/// BECAUSE docs drifted; pass the new versions seen so the revised
/// entry starts with fresh drift signal.
#[serde(default)]
doc_refs: Option<Vec<playbook_memory::DocRef>>,
}
/// Phase 27 — create a new version of an existing playbook. The parent
/// is marked superseded; the new entry inherits the chain via
/// `parent_id` and carries `version = parent.version + 1`. Errors with
/// 400 on a retired or already-superseded parent (must revise the tip
/// of the chain). Embeds the new text through the same shape as
/// `/seed` so cosine similarity stays comparable across rebuild + seed
/// + revise entries.
async fn revise_playbook_memory(
State(state): State<VectorState>,
Json(req): Json<RevisePlaybookRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let text = format!(
"{} | {} | {} | fills: {}",
req.operation, req.approach, req.context,
req.endorsed_names.join(", "),
);
let resp = state.ai_client.embed(EmbedRequest { texts: vec![text], model: None })
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed revise: {e}")))?;
if resp.embeddings.is_empty() {
return Err((StatusCode::BAD_GATEWAY, "embed returned nothing".into()));
}
let emb: Vec<f32> = resp.embeddings[0].iter().map(|&x| x as f32).collect();
let (city, state_) = {
let after_in = req.operation.split(" in ").nth(1).unwrap_or("");
let mut parts = after_in.splitn(2, ',');
let city = parts.next().map(|s| s.trim().to_string()).filter(|s| !s.is_empty());
let state = parts.next()
.map(|s| s.trim().chars().take_while(|c| c.is_ascii_alphabetic()).collect::<String>())
.filter(|s| !s.is_empty());
(city, state)
};
if city.is_none() || state_.is_none() {
return Err((StatusCode::BAD_REQUEST,
"operation must match 'fill: Role xN in City, ST' shape".into()));
}
// Phase 27 — deterministic pid derived ONLY from content-shaped
// inputs (parent_id + operation + approach + context + sorted
// endorsed_names). Excluding wall-clock ts means two revise calls
// with identical payloads produce the same pid, which is the
// contract the docstring promises. On retry the caller gets back
// the same id and the pre-flight idempotency check below short-
// circuits to the existing entry instead of re-appending.
let mut names_sorted = req.endorsed_names.clone();
names_sorted.sort();
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(req.parent_id.as_bytes()); h.update(b"|");
h.update(req.operation.as_bytes()); h.update(b"|");
h.update(req.approach.as_bytes()); h.update(b"|");
h.update(req.context.as_bytes()); h.update(b"|");
h.update(names_sorted.join(",").as_bytes());
let bytes = h.finalize();
let pid = format!("pb-rev-{}", bytes.iter().take(8).map(|b| format!("{b:02x}")).collect::<String>());
// Idempotency short-circuit — if this exact pid already exists in
// memory (from a prior successful revise with the same content),
// return it directly rather than re-appending or 400ing on the
// superseded-parent rejection. Walks the parent's chain via
// history() because the parent may itself have been superseded by
// our prior successful call.
let chain = state.playbook_memory.history(&req.parent_id).await;
if let Some(existing) = chain.iter().find(|e| e.playbook_id == pid) {
return Ok(Json(serde_json::json!({
"outcome": {
"parent_id": req.parent_id,
"parent_version": existing.version.saturating_sub(1),
"new_playbook_id": existing.playbook_id,
"new_version": existing.version,
"superseded_at": existing.superseded_at.clone().unwrap_or_default(),
"idempotent_return": true,
},
"entries_after": state.playbook_memory.entry_count().await,
})));
}
let ts = chrono::Utc::now().to_rfc3339();
let new_entry = playbook_memory::PlaybookEntry {
playbook_id: pid.clone(),
operation: req.operation,
approach: req.approach,
context: req.context,
timestamp: ts,
endorsed_names: req.endorsed_names,
city, state: state_,
embedding: Some(emb),
schema_fingerprint: req.schema_fingerprint,
valid_until: req.valid_until,
// Phase 45 — doc_refs may be provided on revise too.
doc_refs: req.doc_refs.clone().unwrap_or_default(),
// revise_entry overwrites version / parent_id / supersession
// from the parent; other fields keep defaults.
..Default::default()
};
let outcome = state.playbook_memory.revise_entry(&req.parent_id, new_entry)
.await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
Ok(Json(serde_json::json!({
"outcome": outcome,
"entries_after": state.playbook_memory.entry_count().await,
})))
}
/// Phase 27 — return the full version chain containing `playbook_id`,
/// ordered root → tip. 404 if the id isn't present. The walker is
/// cycle-safe by construction (visited set per direction).
async fn playbook_memory_history(
State(state): State<VectorState>,
Path(playbook_id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let chain = state.playbook_memory.history(&playbook_id).await;
if chain.is_empty() {
return Err((StatusCode::NOT_FOUND, format!("no playbook with id '{playbook_id}'")));
}
Ok(Json(serde_json::json!({
"playbook_id": playbook_id,
"versions": chain.len(),
"chain": chain,
})))
}
/// Phase 25 status endpoint — reports retirement counts so dashboards
/// can show "N playbooks retired (12 from 2026-05 schema migration)".
/// Phase 27 added `superseded` as a distinct counter.
async fn playbook_memory_status(
State(state): State<VectorState>,
) -> impl IntoResponse {
let (total, retired, superseded, failures) = state.playbook_memory.status_counts().await;
// `active` = entries eligible for boost. Retired and superseded are
// distinct exclusion reasons; subtract both. An entry can in principle
// be both retired AND superseded (e.g. revised then retired) so
// saturating_sub guards against underflow if that pathological case
// ever lands.
let inactive = retired + superseded;
Json(serde_json::json!({
"total": total,
"retired": retired,
"superseded": superseded,
"active": total.saturating_sub(inactive),
"failures": failures,
}))
}
async fn lance_recall_harness(
State(state): State<VectorState>,
Path(index_name): Path<String>,
Json(req): Json<LanceRecallRequest>,
) -> impl IntoResponse {
let t0 = std::time::Instant::now();
let harness_set = state.harness_store.load_for_index(&index_name, &req.harness).await
.map_err(|e| (StatusCode::NOT_FOUND, format!("harness: {e}")))?;
if !harness_set.ground_truth_built {
return Err((StatusCode::BAD_REQUEST,
"harness has no ground truth — run a regular /hnsw/trial first to compute it".into()));
}
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let k = req.top_k;
let mut per_query = Vec::with_capacity(harness_set.queries.len());
let mut latencies: Vec<f32> = Vec::with_capacity(harness_set.queries.len());
let mut recalls: Vec<f32> = Vec::with_capacity(harness_set.queries.len());
for q in &harness_set.queries {
let qv = match &q.query_embedding {
Some(v) => v,
None => continue,
};
let gt = match &q.ground_truth {
Some(gt) => gt,
None => continue,
};
let qt0 = std::time::Instant::now();
let hits = lance_store.search(
qv,
k,
Some(req.nprobes.unwrap_or(LANCE_DEFAULT_NPROBES)),
Some(req.refine_factor.unwrap_or(LANCE_DEFAULT_REFINE_FACTOR)),
).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("search: {e}")))?;
let lat_us = qt0.elapsed().as_micros() as f32;
let predicted: Vec<String> = hits.iter().map(|h| h.doc_id.clone()).collect();
let recall = harness::recall_at_k(&predicted, gt, k);
per_query.push(LanceRecallQuery {
query_id: q.id.clone(),
recall,
latency_us: lat_us,
hits_returned: hits.len(),
});
latencies.push(lat_us);
recalls.push(recall);
}
let mean_recall = if recalls.is_empty() { 0.0 } else {
recalls.iter().sum::<f32>() / recalls.len() as f32
};
latencies.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p = |pct: f32| -> f32 {
if latencies.is_empty() { return 0.0; }
let idx = ((latencies.len() as f32 - 1.0) * pct).round() as usize;
latencies[idx.min(latencies.len() - 1)]
};
Ok(Json(LanceRecallResult {
index_name,
harness: req.harness,
queries: per_query.len(),
top_k: k,
mean_recall,
per_query,
latency_p50_us: p(0.50),
latency_p95_us: p(0.95),
total_duration_secs: t0.elapsed().as_secs_f32(),
}))
}
/// Build a scalar btree index on a column (typically `doc_id`). Makes
/// filter-pushdown queries O(log N) instead of full-fragment scan.
async fn lance_build_scalar_index(
State(state): State<VectorState>,
Path((index_name, column)): Path<(String, String)>,
) -> impl IntoResponse {
let lance_store = state.lance.store_for(&index_name).await
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
match lance_store.build_scalar_index(&column).await {
Ok(stats) => Ok(Json(stats)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// ─── Pathway memory handlers ──────────────────────────────────────────
//
// Thin wrappers around pathway_memory::PathwayMemory. HTTP surface is
// deliberately small — four endpoints cover the full lifecycle:
// insert at end-of-review, query before running the ladder,
// record_replay after a hot-swap, and stats for the VCP UI.
#[derive(Deserialize)]
struct PathwayQueryRequest {
task_class: String,
file_path: String,
signal_class: Option<String>,
query_vec: Vec<f32>,
}
async fn pathway_insert(
State(state): State<VectorState>,
Json(trace): Json<pathway_memory::PathwayTrace>,
) -> impl IntoResponse {
match state.pathway_memory.insert(trace).await {
Ok(()) => Ok(Json(json!({"ok": true}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn pathway_query(
State(state): State<VectorState>,
Json(req): Json<PathwayQueryRequest>,
) -> impl IntoResponse {
let cand = state
.pathway_memory
.query_hot_swap(
&req.task_class,
&req.file_path,
req.signal_class.as_deref(),
&req.query_vec,
)
.await;
// 200 with null candidate means "no hot-swap"; this is a normal
// path, not an error — callers should proceed with the full ladder.
Json(json!({ "candidate": cand }))
}
#[derive(Deserialize)]
struct PathwayReplayRequest {
pathway_id: String,
succeeded: bool,
}
async fn pathway_record_replay(
State(state): State<VectorState>,
Json(req): Json<PathwayReplayRequest>,
) -> impl IntoResponse {
match state
.pathway_memory
.record_replay_outcome(&req.pathway_id, req.succeeded)
.await
{
Ok(()) => Ok(Json(json!({"ok": true}))),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
async fn pathway_stats(State(state): State<VectorState>) -> impl IntoResponse {
Json(state.pathway_memory.stats().await)
}
#[derive(Deserialize)]
struct PathwayBugFingerprintsRequest {
task_class: String,
file_path: String,
signal_class: Option<String>,
limit: Option<usize>,
}
async fn pathway_bug_fingerprints(
State(state): State<VectorState>,
Json(req): Json<PathwayBugFingerprintsRequest>,
) -> impl IntoResponse {
let fps = state
.pathway_memory
.bug_fingerprints_for(
&req.task_class,
&req.file_path,
req.signal_class.as_deref(),
req.limit.unwrap_or(5),
)
.await;
Json(json!({ "fingerprints": fps }))
}
// ─── Mem0 ops endpoints (J 2026-04-25) ───
async fn pathway_upsert(
State(state): State<VectorState>,
Json(trace): Json<pathway_memory::PathwayTrace>,
) -> impl IntoResponse {
match state.pathway_memory.upsert(trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayRetireRequest {
trace_uid: String,
reason: String,
}
async fn pathway_retire(
State(state): State<VectorState>,
Json(req): Json<PathwayRetireRequest>,
) -> impl IntoResponse {
match state.pathway_memory.retire(&req.trace_uid, &req.reason).await {
Ok(touched) => Ok(Json(json!({"ok": true, "retired": touched}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayReviseRequest {
parent_trace_uid: String,
new_trace: pathway_memory::PathwayTrace,
}
async fn pathway_revise(
State(state): State<VectorState>,
Json(req): Json<PathwayReviseRequest>,
) -> impl IntoResponse {
match state.pathway_memory.revise(&req.parent_trace_uid, req.new_trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn pathway_history(
State(state): State<VectorState>,
axum::extract::Path(trace_uid): axum::extract::Path<String>,
) -> impl IntoResponse {
let chain = state.pathway_memory.history(&trace_uid).await;
Json(json!({"trace_uid": trace_uid, "chain_len": chain.len(), "chain": chain}))
}
#[cfg(test)]
mod extractor_tests {
use super::*;
#[test]
fn extract_target_geo_basic() {
let f = "role = 'Welder' AND city = 'Toledo' AND state = 'OH' AND CAST(availability AS DOUBLE) > 0.5";
assert_eq!(extract_target_geo(f), Some(("Toledo".into(), "OH".into())));
}
#[test]
fn extract_target_geo_missing_state_returns_none() {
let f = "role = 'Welder' AND city = 'Toledo'";
assert_eq!(extract_target_geo(f), None);
}
#[test]
fn extract_target_geo_word_boundary() {
// "civilian" contains "city" as a substring — must not match.
let f = "civilian_rank = 1 AND city = 'Toledo' AND state = 'OH'";
assert_eq!(extract_target_geo(f), Some(("Toledo".into(), "OH".into())));
}
#[test]
fn extract_target_role_basic() {
let f = "role = 'Welder' AND city = 'Toledo'";
assert_eq!(extract_target_role(f), Some("Welder".into()));
}
#[test]
fn extract_target_role_none_when_absent() {
let f = "city = 'Toledo' AND state = 'OH'";
assert_eq!(extract_target_role(f), None);
}
#[test]
fn extract_target_role_multi_word() {
let f = "role = 'Warehouse Associate' AND city = 'Chicago'";
assert_eq!(extract_target_role(f), Some("Warehouse Associate".into()));
}
}