root 97a376482c Phase C: Decoupled embedding refresh
Implements the llms3.com-inspired pattern: embeddings refresh
asynchronously, decoupled from transactional row writes. New rows arrive,
ingest marks the vector index stale, a later refresh embeds only the
delta (doc_ids not already in the index).

Schema additions (DatasetManifest):
- last_embedded_at: Option<DateTime> - when the index was last refreshed
- embedding_stale_since: Option<DateTime> - set when data written, cleared on refresh
- embedding_refresh_policy: Option<RefreshPolicy> - Manual | OnAppend | Scheduled

Ingest paths (pipeline::ingest_file + pg_stream) call
registry.mark_embeddings_stale after writing. No-op if the dataset has
never been embedded — stale semantics only kick in once last_embedded_at
is set.

Refresh pipeline (vectord::refresh::refresh_index):
- Reads the dataset Parquet, extracts (doc_id, text) pairs
- Accepts Utf8 / Int32 / Int64 id columns (covers both CSV and pg schemas)
- Loads existing embeddings via EmbeddingCache (empty on first-time build)
- Filters to rows whose doc_id is NOT in the existing set
- Chunks (chunker::chunk_column), embeds via Ollama (batches of 32),
  writes combined index, clears stale flag

Endpoints:
- POST /vectors/refresh/{dataset_name} - body {index_name, id_column,
  text_column, chunk_size?, overlap?}
- GET /vectors/stale - lists datasets whose embedding_stale_since is set

End-to-end verified on threat_intel (knowledge_base.threat_intel):
- Initial refresh: 20 rows -> 20 chunks -> embedded in 2.1s,
  last_embedded_at set
- Idempotent second refresh: 0 new docs -> 1.8ms (pure delta check)
- Re-ingest to 54 rows: mark_embeddings_stale fires -> stale_since set
- /vectors/stale surfaces threat_intel with timestamps + policy
- Delta refresh: 34 new docs embedded in 970ms (6x faster than full
  re-embed); stale_cleared = true

Not in MVP scope:
- UPDATE semantics (same doc_id, different content) - would need
  per-row content hashing
- OnAppend policy auto-trigger - just declares intent; actual scheduler
  deferred
- Scheduler runtime - the Scheduled(cron) variant declares the intent so
  operators can see which datasets expect what, but the cron itself is
  separate

Per ADR-019: when a profile switches to vector_backend=Lance, this
refresh path benefits — Lance's native append replaces our "read all +
rewrite" Parquet rebuild pattern. Current MVP works well enough at
~500-5K rows to validate the architecture; Lance unblocks the 5M+ case.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 03:00:43 -05:00

328 lines
13 KiB
Rust

use shared::types::{
DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint,
ColumnMeta, Lineage, FreshnessContract, RefreshPolicy, Sensitivity,
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::ops;
use object_store::ObjectStore;
/// Partial metadata update — only set fields are applied.
#[derive(Debug, Clone, Default, serde::Deserialize)]
pub struct MetadataUpdate {
pub description: Option<String>,
pub owner: Option<String>,
pub sensitivity: Option<Sensitivity>,
pub tags: Option<Vec<String>>,
pub columns: Option<Vec<ColumnMeta>>,
pub lineage: Option<Lineage>,
pub freshness: Option<FreshnessContract>,
pub row_count: Option<u64>,
// Phase C embedding freshness
pub embedding_refresh_policy: Option<RefreshPolicy>,
}
const MANIFEST_PREFIX: &str = "_catalog/manifests";
/// In-memory dataset registry backed by manifest persistence in object storage.
#[derive(Clone)]
pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
store: Arc<dyn ObjectStore>,
}
impl Registry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
store,
}
}
/// Rebuild in-memory index from persisted manifests on startup.
pub async fn rebuild(&self) -> Result<usize, String> {
let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?;
let mut datasets = self.datasets.write().await;
datasets.clear();
for key in &keys {
let data = ops::get(&self.store, key).await?;
let manifest: DatasetManifest =
serde_json::from_slice(&data).map_err(|e| e.to_string())?;
datasets.insert(manifest.id.clone(), manifest);
}
let count = datasets.len();
tracing::info!("catalog rebuilt: {count} datasets loaded");
Ok(count)
}
/// Register a new dataset. Persists manifest to storage before updating memory.
pub async fn register(
&self,
name: String,
schema_fingerprint: SchemaFingerprint,
objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let now = chrono::Utc::now();
let manifest = DatasetManifest {
id: DatasetId::new(),
name,
schema_fingerprint,
objects,
created_at: now,
updated_at: now,
description: String::new(),
owner: String::new(),
sensitivity: None,
columns: vec![],
lineage: None,
freshness: None,
tags: vec![],
row_count: None,
last_embedded_at: None,
embedding_stale_since: None,
embedding_refresh_policy: None,
};
// Write-ahead: persist before in-memory update
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
let mut datasets = self.datasets.write().await;
datasets.insert(manifest.id.clone(), manifest.clone());
tracing::info!("registered dataset: {} ({})", manifest.name, manifest.id);
Ok(manifest)
}
/// Update metadata on an existing dataset (owner, description, tags, sensitivity, etc.)
pub async fn update_metadata(
&self,
name: &str,
updates: MetadataUpdate,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
if let Some(desc) = updates.description { manifest.description = desc; }
if let Some(owner) = updates.owner { manifest.owner = owner; }
if let Some(sens) = updates.sensitivity { manifest.sensitivity = Some(sens); }
if let Some(tags) = updates.tags { manifest.tags = tags; }
if let Some(cols) = updates.columns { manifest.columns = cols; }
if let Some(lineage) = updates.lineage { manifest.lineage = Some(lineage); }
if let Some(freshness) = updates.freshness { manifest.freshness = Some(freshness); }
if let Some(count) = updates.row_count { manifest.row_count = Some(count); }
if let Some(policy) = updates.embedding_refresh_policy { manifest.embedding_refresh_policy = Some(policy); }
manifest.updated_at = chrono::Utc::now();
// Persist
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
let result = manifest.clone();
Ok(result)
}
/// Get a dataset by ID.
pub async fn get(&self, id: &DatasetId) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.get(id).cloned()
}
/// Get a dataset by name.
pub async fn get_by_name(&self, name: &str) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().find(|d| d.name == name).cloned()
}
/// List all datasets.
pub async fn list(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().cloned().collect()
}
/// Re-read the parquet footer(s) for a dataset and repopulate `row_count`
/// and `columns` from reality. Use this to repair manifests whose
/// metadata was lost (e.g. migrated from a pre-Phase 10 catalog).
///
/// Does NOT touch owner/description/sensitivity/lineage/tags — only
/// the structural facts that parquet can tell us authoritatively.
/// The existing `schema_fingerprint` is updated if the recomputed one
/// differs; a warning is logged so drift is visible.
pub async fn resync_from_parquet(&self, name: &str) -> Result<DatasetManifest, String> {
use shared::arrow_helpers::{fingerprint_schema, parquet_to_record_batches};
// Snapshot the target manifest so we don't hold the write lock during IO.
let (id, objects, old_fp) = {
let datasets = self.datasets.read().await;
let m = datasets
.values()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
(m.id.clone(), m.objects.clone(), m.schema_fingerprint.clone())
};
if objects.is_empty() {
return Err(format!("dataset '{name}' has no object references to resync from"));
}
let mut total_rows: u64 = 0;
let mut first_schema: Option<arrow::datatypes::SchemaRef> = None;
for obj in &objects {
let data = ops::get(&self.store, &obj.key).await
.map_err(|e| format!("read {}: {e}", obj.key))?;
let (schema, batches) = parquet_to_record_batches(&data)
.map_err(|e| format!("parse {}: {e}", obj.key))?;
let rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
total_rows += rows;
if first_schema.is_none() {
first_schema = Some(schema);
}
}
let schema = first_schema.ok_or("no schema recovered")?;
let new_fp = fingerprint_schema(&schema);
if new_fp != old_fp {
tracing::warn!(
"dataset '{}' schema fingerprint drift: {} -> {} (updating to match parquet reality)",
name, old_fp.0, new_fp.0,
);
}
let columns: Vec<ColumnMeta> = schema
.fields()
.iter()
.map(|f| ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: None,
description: String::new(),
is_pii: false,
})
.collect();
// Apply updates.
let mut datasets = self.datasets.write().await;
let manifest = datasets
.get_mut(&id)
.ok_or_else(|| format!("dataset disappeared during resync: {name}"))?;
manifest.row_count = Some(total_rows);
manifest.columns = columns;
manifest.schema_fingerprint = new_fp;
manifest.updated_at = chrono::Utc::now();
// Persist.
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
tracing::info!("resynced '{name}': row_count={total_rows}, {} columns", manifest.columns.len());
Ok(manifest.clone())
}
/// Resync every dataset that currently has a null row_count.
/// Returns (successes, failures) where each entry is (name, detail).
pub async fn resync_missing(&self) -> (Vec<(String, u64)>, Vec<(String, String)>) {
let names: Vec<String> = {
let datasets = self.datasets.read().await;
datasets
.values()
.filter(|d| d.row_count.is_none() || d.columns.is_empty())
.map(|d| d.name.clone())
.collect()
};
let mut ok = Vec::new();
let mut err = Vec::new();
for name in names {
match self.resync_from_parquet(&name).await {
Ok(m) => ok.push((name, m.row_count.unwrap_or(0))),
Err(e) => err.push((name, e)),
}
}
(ok, err)
}
/// Mark a dataset's embeddings as stale (row-level data has been written
/// since the last embedding refresh). Idempotent — setting stale when
/// already stale is a no-op. Only marks stale if the dataset has been
/// embedded before — a never-embedded dataset doesn't need a stale flag
/// (it just needs an initial index build). Called from the ingest path.
pub async fn mark_embeddings_stale(&self, name: &str) -> Result<(), String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets
.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
if manifest.last_embedded_at.is_none() {
return Ok(()); // never embedded -> no stale semantics yet
}
if manifest.embedding_stale_since.is_none() {
manifest.embedding_stale_since = Some(chrono::Utc::now());
manifest.updated_at = chrono::Utc::now();
let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
tracing::info!("marked embeddings stale for dataset '{name}'");
}
Ok(())
}
/// Clear the stale marker and set `last_embedded_at = now`.
/// Called by the embedding refresh pipeline once it finishes.
pub async fn clear_embeddings_stale(&self, name: &str) -> Result<(), String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets
.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
let now = chrono::Utc::now();
manifest.embedding_stale_since = None;
manifest.last_embedded_at = Some(now);
manifest.updated_at = now;
let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
Ok(())
}
/// List datasets whose `embedding_stale_since` is set — they need a refresh.
pub async fn stale_datasets(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets
.values()
.filter(|d| d.embedding_stale_since.is_some())
.cloned()
.collect()
}
/// Add objects to an existing dataset.
pub async fn add_objects(
&self,
id: &DatasetId,
new_objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets.get_mut(id).ok_or_else(|| format!("dataset not found: {id}"))?;
manifest.objects.extend(new_objects);
manifest.updated_at = chrono::Utc::now();
// Persist updated manifest
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
Ok(manifest.clone())
}
}