profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

1031 lines
41 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use shared::types::{
AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile,
ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone,
};
use crate::tombstones::TombstoneStore;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::ops;
use object_store::ObjectStore;
/// Make a view name safe for use as an object storage key.
/// Allows letters, digits, `_`, `-`, `.`. Anything else becomes `_`.
fn sanitize_view_name(name: &str) -> String {
name.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' { c } else { '_' })
.collect()
}
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct MigrateBucketsReport {
pub refs_examined: usize,
pub refs_renamed: usize, // legacy "data"/"local" → "primary"
pub refs_stamped: usize, // empty → "primary"
pub refs_unchanged: usize, // already canonical
pub manifests_persisted: usize,
}
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct DedupeReport {
pub groups: usize,
pub removed: usize,
pub kept: Vec<DedupeKept>,
pub errors: Vec<DedupeError>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DedupeKept {
pub name: String,
pub kept_id: String,
pub removed: usize,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DedupeError {
pub manifest_id: String,
pub error: String,
}
/// Partial metadata update — only set fields are applied.
#[derive(Debug, Clone, Default, serde::Deserialize)]
pub struct MetadataUpdate {
pub description: Option<String>,
pub owner: Option<String>,
pub sensitivity: Option<Sensitivity>,
pub tags: Option<Vec<String>>,
pub columns: Option<Vec<ColumnMeta>>,
pub lineage: Option<Lineage>,
pub freshness: Option<FreshnessContract>,
pub row_count: Option<u64>,
// Phase C embedding freshness
pub embedding_refresh_policy: Option<RefreshPolicy>,
}
const MANIFEST_PREFIX: &str = "_catalog/manifests";
const VIEW_PREFIX: &str = "_catalog/views";
const PROFILE_PREFIX: &str = "_catalog/profiles";
/// In-memory dataset registry backed by manifest persistence in object storage.
/// Also tracks AiViews (Phase D) — safe projections over base datasets.
/// And tombstones (Phase E) — soft-delete markers applied at query time.
#[derive(Clone)]
pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
views: Arc<RwLock<HashMap<String, AiView>>>,
profiles: Arc<RwLock<HashMap<String, ModelProfile>>>,
tombstones: TombstoneStore,
store: Arc<dyn ObjectStore>,
}
impl Registry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
views: Arc::new(RwLock::new(HashMap::new())),
profiles: Arc::new(RwLock::new(HashMap::new())),
tombstones: TombstoneStore::new(store.clone()),
store,
}
}
pub fn tombstones(&self) -> &TombstoneStore {
&self.tombstones
}
/// Add a tombstone for a dataset row. Validates that the dataset
/// exists. Idempotent at the endpoint layer — callers dedup if needed.
pub async fn add_tombstone(
&self,
dataset: &str,
row_key_column: &str,
row_key_value: &str,
actor: &str,
reason: &str,
) -> Result<Tombstone, String> {
if self.get_by_name(dataset).await.is_none() {
return Err(format!("dataset not found: {dataset}"));
}
let ts = Tombstone {
dataset: dataset.to_string(),
row_key_column: row_key_column.to_string(),
row_key_value: row_key_value.to_string(),
deleted_at: chrono::Utc::now(),
actor: actor.to_string(),
reason: reason.to_string(),
};
self.tombstones.append(&ts).await?;
Ok(ts)
}
pub async fn list_tombstones(&self, dataset: &str) -> Result<Vec<Tombstone>, String> {
self.tombstones.list(dataset).await
}
/// Rebuild in-memory index from persisted manifests + views on startup.
pub async fn rebuild(&self) -> Result<usize, String> {
let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?;
let mut datasets = self.datasets.write().await;
datasets.clear();
for key in &keys {
let data = ops::get(&self.store, key).await?;
let manifest: DatasetManifest =
serde_json::from_slice(&data).map_err(|e| e.to_string())?;
datasets.insert(manifest.id.clone(), manifest);
}
let count = datasets.len();
tracing::info!("catalog rebuilt: {count} datasets loaded");
// Phase D: load AiView definitions alongside manifests.
let view_keys = ops::list(&self.store, Some(VIEW_PREFIX)).await.unwrap_or_default();
let mut views = self.views.write().await;
views.clear();
for key in &view_keys {
if !key.ends_with(".json") { continue; }
let data = match ops::get(&self.store, key).await {
Ok(d) => d,
Err(e) => {
tracing::warn!("view '{key}': read failed: {e}");
continue;
}
};
match serde_json::from_slice::<AiView>(&data) {
Ok(view) => { views.insert(view.name.clone(), view); }
Err(e) => tracing::warn!("view '{key}': parse failed: {e}"),
}
}
if !views.is_empty() {
tracing::info!("catalog: {} views loaded", views.len());
}
// Phase 17: load model profiles.
let profile_keys = ops::list(&self.store, Some(PROFILE_PREFIX)).await.unwrap_or_default();
let mut profiles = self.profiles.write().await;
profiles.clear();
for key in &profile_keys {
if !key.ends_with(".json") { continue; }
let data = match ops::get(&self.store, key).await {
Ok(d) => d,
Err(e) => { tracing::warn!("profile '{key}': read failed: {e}"); continue; }
};
match serde_json::from_slice::<ModelProfile>(&data) {
Ok(p) => { profiles.insert(p.id.clone(), p); }
Err(e) => tracing::warn!("profile '{key}': parse failed: {e}"),
}
}
if !profiles.is_empty() {
tracing::info!("catalog: {} model profiles loaded", profiles.len());
}
Ok(count)
}
// --- Phase 17: Model profiles ---
/// Create or replace a model profile. Validates id slug + ensures
/// every bound_dataset exists (as either a raw dataset or an AiView).
pub async fn put_profile(&self, profile: ModelProfile) -> Result<ModelProfile, String> {
if profile.id.is_empty() {
return Err("profile id is empty".into());
}
if !profile.id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') {
return Err(format!(
"profile id '{}' must be alphanumeric + '-' or '_' only", profile.id,
));
}
for binding in &profile.bound_datasets {
let exists = self.get_by_name(binding).await.is_some()
|| self.get_view(binding).await.is_some();
if !exists {
return Err(format!(
"bound dataset '{}' not found as dataset or view", binding,
));
}
}
let key = format!("{PROFILE_PREFIX}/{}.json", profile.id);
let json = serde_json::to_vec_pretty(&profile).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
let mut profiles = self.profiles.write().await;
profiles.insert(profile.id.clone(), profile.clone());
tracing::info!(
"profile registered: {} -> ollama={} bindings={:?}",
profile.id, profile.ollama_name, profile.bound_datasets,
);
Ok(profile)
}
pub async fn get_profile(&self, id: &str) -> Option<ModelProfile> {
self.profiles.read().await.get(id).cloned()
}
pub async fn list_profiles(&self) -> Vec<ModelProfile> {
self.profiles.read().await.values().cloned().collect()
}
pub async fn delete_profile(&self, id: &str) -> Result<(), String> {
let key = format!("{PROFILE_PREFIX}/{id}.json");
ops::delete(&self.store, &key).await?;
self.profiles.write().await.remove(id);
Ok(())
}
/// Register a dataset. Idempotent on `name`:
/// - No existing dataset by this name → create new manifest.
/// - Exists with same schema fingerprint → update objects + `updated_at`
/// in place (re-ingest of identical-shape data).
/// - Exists with different schema fingerprint → reject as `Err`
/// (callers map to HTTP 409 / gRPC FAILED_PRECONDITION). Schema
/// evolution is not yet exposed as an HTTP endpoint; callers must
/// register under a new name or delete the existing manifest first.
///
/// Concurrency: the write lock is held across the storage write. That
/// serializes concurrent registers but is correctness-mandatory —
/// dropping the lock across the check→insert sequence creates a TOCTOU
/// window where two callers with the same name both see "no existing"
/// and both insert, reintroducing the duplicate-manifest bug this
/// function exists to prevent. Register is a metadata-only hop on the
/// ingest path, so the serialization cost is acceptable. Precedent:
/// `update_metadata` also holds the write lock across its I/O.
///
/// Legacy state: if multiple manifests with the same name already exist
/// (from before this was idempotent), the most recently updated one is
/// treated as canonical. Run `dedupe_by_name` to clean up the rest.
pub async fn register(
&self,
name: String,
schema_fingerprint: SchemaFingerprint,
objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let existing = datasets
.values()
.filter(|d| d.name == name)
.max_by_key(|d| d.updated_at)
.cloned();
if let Some(mut manifest) = existing {
if manifest.schema_fingerprint != schema_fingerprint {
return Err(format!(
"dataset '{}' already exists with a different schema \
(existing fingerprint: {}, new: {}). Schema-evolution \
migration is not yet exposed as an HTTP endpoint; \
register under a new name or delete the existing \
manifest first.",
name, manifest.schema_fingerprint.0, schema_fingerprint.0,
));
}
manifest.objects = objects;
manifest.updated_at = chrono::Utc::now();
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
datasets.insert(manifest.id.clone(), manifest.clone());
tracing::info!("re-registered (idempotent): {} ({})", manifest.name, manifest.id);
return Ok(manifest);
}
let now = chrono::Utc::now();
let manifest = DatasetManifest {
id: DatasetId::new(),
name,
schema_fingerprint,
objects,
created_at: now,
updated_at: now,
description: String::new(),
owner: String::new(),
sensitivity: None,
columns: vec![],
lineage: None,
freshness: None,
tags: vec![],
row_count: None,
last_embedded_at: None,
embedding_stale_since: None,
embedding_refresh_policy: None,
};
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
datasets.insert(manifest.id.clone(), manifest.clone());
tracing::info!("registered dataset: {} ({})", manifest.name, manifest.id);
Ok(manifest)
}
/// Update metadata on an existing dataset (owner, description, tags, sensitivity, etc.)
pub async fn update_metadata(
&self,
name: &str,
updates: MetadataUpdate,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
if let Some(desc) = updates.description { manifest.description = desc; }
if let Some(owner) = updates.owner { manifest.owner = owner; }
if let Some(sens) = updates.sensitivity { manifest.sensitivity = Some(sens); }
if let Some(tags) = updates.tags { manifest.tags = tags; }
if let Some(cols) = updates.columns { manifest.columns = cols; }
if let Some(lineage) = updates.lineage { manifest.lineage = Some(lineage); }
if let Some(freshness) = updates.freshness { manifest.freshness = Some(freshness); }
if let Some(count) = updates.row_count { manifest.row_count = Some(count); }
if let Some(policy) = updates.embedding_refresh_policy { manifest.embedding_refresh_policy = Some(policy); }
manifest.updated_at = chrono::Utc::now();
// Persist
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
let result = manifest.clone();
Ok(result)
}
/// Get a dataset by ID.
pub async fn get(&self, id: &DatasetId) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.get(id).cloned()
}
/// Get a dataset by name.
pub async fn get_by_name(&self, name: &str) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().find(|d| d.name == name).cloned()
}
/// List all datasets.
pub async fn list(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().cloned().collect()
}
/// Re-read the parquet footer(s) for a dataset and repopulate `row_count`
/// and `columns` from reality. Use this to repair manifests whose
/// metadata was lost (e.g. migrated from a pre-Phase 10 catalog).
///
/// Does NOT touch owner/description/sensitivity/lineage/tags — only
/// the structural facts that parquet can tell us authoritatively.
/// The existing `schema_fingerprint` is updated if the recomputed one
/// differs; a warning is logged so drift is visible.
pub async fn resync_from_parquet(&self, name: &str) -> Result<DatasetManifest, String> {
use shared::arrow_helpers::{fingerprint_schema, parquet_to_record_batches};
// Snapshot the target manifest so we don't hold the write lock during IO.
let (id, objects, old_fp) = {
let datasets = self.datasets.read().await;
let m = datasets
.values()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
(m.id.clone(), m.objects.clone(), m.schema_fingerprint.clone())
};
if objects.is_empty() {
return Err(format!("dataset '{name}' has no object references to resync from"));
}
let mut total_rows: u64 = 0;
let mut first_schema: Option<arrow::datatypes::SchemaRef> = None;
for obj in &objects {
let data = ops::get(&self.store, &obj.key).await
.map_err(|e| format!("read {}: {e}", obj.key))?;
let (schema, batches) = parquet_to_record_batches(&data)
.map_err(|e| format!("parse {}: {e}", obj.key))?;
let rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
total_rows += rows;
if first_schema.is_none() {
first_schema = Some(schema);
}
}
let schema = first_schema.ok_or("no schema recovered")?;
let new_fp = fingerprint_schema(&schema);
if new_fp != old_fp {
tracing::warn!(
"dataset '{}' schema fingerprint drift: {} -> {} (updating to match parquet reality)",
name, old_fp.0, new_fp.0,
);
}
let columns: Vec<ColumnMeta> = schema
.fields()
.iter()
.map(|f| ColumnMeta {
name: f.name().clone(),
data_type: f.data_type().to_string(),
sensitivity: None,
description: String::new(),
is_pii: false,
})
.collect();
// Apply updates.
let mut datasets = self.datasets.write().await;
let manifest = datasets
.get_mut(&id)
.ok_or_else(|| format!("dataset disappeared during resync: {name}"))?;
manifest.row_count = Some(total_rows);
manifest.columns = columns;
manifest.schema_fingerprint = new_fp;
manifest.updated_at = chrono::Utc::now();
// Persist.
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
tracing::info!("resynced '{name}': row_count={total_rows}, {} columns", manifest.columns.len());
Ok(manifest.clone())
}
/// Collapse duplicate manifests that share a `name`. Winner per group:
/// 1. Prefer a manifest with a non-null `row_count` (already resynced).
/// 2. Break ties by newest `updated_at`.
/// Losers are removed from the in-memory registry AND their manifest
/// JSON is deleted from object storage. Datasets with a single manifest
/// are untouched. Parquet data files are never touched — only catalog
/// metadata. See `register` for the prevention side of this fix.
///
/// Operator-only. Do not run while ingest is active: a concurrent
/// `register` between the snapshot and the delete sweep can create a
/// new manifest under a name we just deduped, producing a transient
/// count of 2 until the next dedupe run. Safe outside ingest windows.
pub async fn dedupe_by_name(&self) -> DedupeReport {
let groups: Vec<(String, Vec<DatasetManifest>)> = {
let datasets = self.datasets.read().await;
let mut by_name: HashMap<String, Vec<DatasetManifest>> = HashMap::new();
for m in datasets.values() {
by_name.entry(m.name.clone()).or_default().push(m.clone());
}
by_name.into_iter().filter(|(_, v)| v.len() > 1).collect()
};
let mut report = DedupeReport::default();
let mut to_delete: Vec<DatasetId> = Vec::new();
for (name, mut manifests) in groups {
report.groups += 1;
manifests.sort_by(|a, b| {
b.row_count.is_some().cmp(&a.row_count.is_some())
.then(b.updated_at.cmp(&a.updated_at))
});
let winner = &manifests[0];
report.kept.push(DedupeKept {
name: name.clone(),
kept_id: winner.id.to_string(),
removed: manifests.len() - 1,
});
for loser in &manifests[1..] {
to_delete.push(loser.id.clone());
}
}
{
let mut datasets = self.datasets.write().await;
for id in &to_delete {
datasets.remove(id);
}
}
for id in &to_delete {
let key = format!("{MANIFEST_PREFIX}/{}.json", id);
match ops::delete(&self.store, &key).await {
Ok(_) => report.removed += 1,
Err(e) => report.errors.push(DedupeError {
manifest_id: id.to_string(),
error: e,
}),
}
}
tracing::info!(
"dedupe: {} groups collapsed, {} manifests removed, {} errors",
report.groups, report.removed, report.errors.len(),
);
report
}
/// Resync every dataset that currently has a null row_count.
/// Returns (successes, failures) where each entry is (name, detail).
pub async fn resync_missing(&self) -> (Vec<(String, u64)>, Vec<(String, String)>) {
let names: Vec<String> = {
let datasets = self.datasets.read().await;
datasets
.values()
.filter(|d| d.row_count.is_none() || d.columns.is_empty())
.map(|d| d.name.clone())
.collect()
};
let mut ok = Vec::new();
let mut err = Vec::new();
for name in names {
match self.resync_from_parquet(&name).await {
Ok(m) => ok.push((name, m.row_count.unwrap_or(0))),
Err(e) => err.push((name, e)),
}
}
(ok, err)
}
/// Mark a dataset's embeddings as stale (row-level data has been written
/// since the last embedding refresh). Idempotent — setting stale when
/// already stale is a no-op. Only marks stale if the dataset has been
/// embedded before — a never-embedded dataset doesn't need a stale flag
/// (it just needs an initial index build). Called from the ingest path.
pub async fn mark_embeddings_stale(&self, name: &str) -> Result<(), String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets
.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
if manifest.last_embedded_at.is_none() {
return Ok(()); // never embedded -> no stale semantics yet
}
if manifest.embedding_stale_since.is_none() {
manifest.embedding_stale_since = Some(chrono::Utc::now());
manifest.updated_at = chrono::Utc::now();
let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
tracing::info!("marked embeddings stale for dataset '{name}'");
}
Ok(())
}
/// Clear the stale marker and set `last_embedded_at = now`.
/// Called by the embedding refresh pipeline once it finishes.
pub async fn clear_embeddings_stale(&self, name: &str) -> Result<(), String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets
.values_mut()
.find(|d| d.name == name)
.ok_or_else(|| format!("dataset not found: {name}"))?;
let now = chrono::Utc::now();
manifest.embedding_stale_since = None;
manifest.last_embedded_at = Some(now);
manifest.updated_at = now;
let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
Ok(())
}
/// Federation layer 2: stamp `bucket = "primary"` on every ObjectRef
/// whose `bucket` field is empty or matches a legacy value (`"data"`,
/// `"local"`). One-shot migration; re-running is a safe no-op once
/// every ref is canonical.
pub async fn migrate_buckets_to_primary(&self) -> Result<MigrateBucketsReport, String> {
let mut datasets = self.datasets.write().await;
let mut report = MigrateBucketsReport::default();
let mut to_persist: Vec<(DatasetId, DatasetManifest)> = Vec::new();
for manifest in datasets.values_mut() {
let mut changed = false;
for obj in manifest.objects.iter_mut() {
report.refs_examined += 1;
if obj.bucket.is_empty() {
obj.bucket = "primary".to_string();
report.refs_stamped += 1;
changed = true;
} else if obj.bucket == "data" || obj.bucket == "local" {
obj.bucket = "primary".to_string();
report.refs_renamed += 1;
changed = true;
} else {
report.refs_unchanged += 1;
}
}
if changed {
manifest.updated_at = chrono::Utc::now();
to_persist.push((manifest.id.clone(), manifest.clone()));
}
}
// Persist updated manifests after we've finished mutating the map.
for (id, manifest) in to_persist {
let key = format!("{MANIFEST_PREFIX}/{}.json", id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
report.manifests_persisted += 1;
}
tracing::info!(
"bucket migration: examined {} refs, renamed {}, stamped {}, unchanged {}, persisted {} manifests",
report.refs_examined,
report.refs_renamed,
report.refs_stamped,
report.refs_unchanged,
report.manifests_persisted,
);
Ok(report)
}
// --- Phase D: AI-safe views ---
/// Create or replace a named view. Validates that the base dataset
/// exists and that the column whitelist is non-empty. Persists to
/// `_catalog/views/{name}.json`.
pub async fn put_view(&self, mut view: AiView) -> Result<AiView, String> {
if view.name.is_empty() {
return Err("view name is empty".into());
}
if view.columns.is_empty() {
return Err("view must whitelist at least one column".into());
}
// Base dataset must exist (read-side will fail anyway, fail-fast here
// so operators don't end up with dangling views).
if self.get_by_name(&view.base_dataset).await.is_none() {
return Err(format!(
"base dataset '{}' not found in catalog",
view.base_dataset
));
}
if view.created_at.timestamp() == 0 {
view.created_at = chrono::Utc::now();
}
let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(&view.name));
let json = serde_json::to_vec_pretty(&view).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
let mut views = self.views.write().await;
views.insert(view.name.clone(), view.clone());
tracing::info!("view registered: {} over '{}'", view.name, view.base_dataset);
Ok(view)
}
pub async fn get_view(&self, name: &str) -> Option<AiView> {
self.views.read().await.get(name).cloned()
}
pub async fn list_views(&self) -> Vec<AiView> {
self.views.read().await.values().cloned().collect()
}
pub async fn delete_view(&self, name: &str) -> Result<(), String> {
let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(name));
ops::delete(&self.store, &key).await?;
self.views.write().await.remove(name);
Ok(())
}
/// Remove a dataset from the catalog by name. Deletes the manifest
/// from both the in-memory registry and object storage.
///
/// Scope is metadata-only: the underlying parquet files, vector
/// indexes, tombstones, trial journals, and AiViews that reference
/// this dataset are NOT touched. Caller is responsible for any
/// cascade cleanup. This mirrors how a DROP TABLE in a typical
/// warehouse separates "forget about this dataset" from "actually
/// reclaim the bytes".
///
/// Returns Ok(()) when the dataset existed and was removed; an Err
/// with "dataset not found" when no manifest by that name exists.
/// Legacy state with duplicate names: all matching manifests are
/// removed (effectively a post-hoc dedupe on that name).
pub async fn delete_dataset(&self, name: &str) -> Result<usize, String> {
let ids_to_remove: Vec<DatasetId> = {
let datasets = self.datasets.read().await;
datasets.values().filter(|d| d.name == name).map(|d| d.id.clone()).collect()
};
if ids_to_remove.is_empty() {
return Err(format!("dataset not found: {name}"));
}
for id in &ids_to_remove {
let key = format!("{MANIFEST_PREFIX}/{}.json", id);
if let Err(e) = ops::delete(&self.store, &key).await {
// Storage delete failed — log and keep going. The in-memory
// remove below is still correct; on restart, the missing
// storage object just means nothing to rehydrate.
tracing::warn!("delete_dataset '{name}': storage delete of {key} failed: {e}");
}
}
let mut datasets = self.datasets.write().await;
for id in &ids_to_remove {
datasets.remove(id);
}
tracing::info!("deleted dataset '{}' ({} manifest(s))", name, ids_to_remove.len());
Ok(ids_to_remove.len())
}
/// List datasets whose `embedding_stale_since` is set — they need a refresh.
pub async fn stale_datasets(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets
.values()
.filter(|d| d.embedding_stale_since.is_some())
.cloned()
.collect()
}
/// Add objects to an existing dataset.
pub async fn add_objects(
&self,
id: &DatasetId,
new_objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets.get_mut(id).ok_or_else(|| format!("dataset not found: {id}"))?;
manifest.objects.extend(new_objects);
manifest.updated_at = chrono::Utc::now();
// Persist updated manifest
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
Ok(manifest.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
fn fixture() -> Registry {
Registry::new(Arc::new(InMemory::new()))
}
fn fp(s: &str) -> SchemaFingerprint {
SchemaFingerprint(s.to_string())
}
fn obj(key: &str) -> ObjectRef {
ObjectRef {
bucket: "primary".to_string(),
key: key.to_string(),
size_bytes: 100,
created_at: chrono::Utc::now(),
}
}
#[tokio::test]
async fn fresh_register_creates_new_manifest() {
let reg = fixture();
let m = reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap();
assert_eq!(m.name, "a");
assert_eq!(m.schema_fingerprint, fp("f1"));
assert_eq!(reg.list().await.len(), 1);
assert_eq!(reg.get_by_name("a").await.unwrap().id, m.id);
}
#[tokio::test]
async fn re_register_same_fingerprint_is_idempotent() {
let reg = fixture();
let first = reg.register("a".into(), fp("f1"), vec![obj("old.parquet")]).await.unwrap();
let second = reg.register("a".into(), fp("f1"), vec![obj("new.parquet")]).await.unwrap();
assert_eq!(first.id, second.id, "same name+fp should reuse ID");
assert_eq!(reg.list().await.len(), 1, "no duplicate manifest created");
let fetched = reg.get_by_name("a").await.unwrap();
assert_eq!(fetched.objects[0].key, "new.parquet", "objects replaced");
assert!(fetched.updated_at >= first.updated_at, "updated_at bumped");
}
#[tokio::test]
async fn re_register_different_fingerprint_rejects() {
let reg = fixture();
reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap();
let err = reg.register("a".into(), fp("f2"), vec![obj("a.parquet")]).await.unwrap_err();
assert!(err.contains("different schema"), "error mentions schema drift: {err}");
assert_eq!(reg.list().await.len(), 1, "failed register did not mutate state");
assert_eq!(reg.get_by_name("a").await.unwrap().schema_fingerprint, fp("f1"));
}
#[tokio::test]
async fn delete_dataset_removes_manifest_and_in_memory_entry() {
let reg = fixture();
reg.register("to_delete".into(), fp("f1"), vec![obj("d.parquet")]).await.unwrap();
reg.register("keepme".into(), fp("f2"), vec![obj("k.parquet")]).await.unwrap();
assert_eq!(reg.list().await.len(), 2);
let removed = reg.delete_dataset("to_delete").await.unwrap();
assert_eq!(removed, 1, "one manifest removed");
assert!(reg.get_by_name("to_delete").await.is_none());
assert!(reg.get_by_name("keepme").await.is_some(), "unrelated dataset untouched");
assert_eq!(reg.list().await.len(), 1);
}
#[tokio::test]
async fn delete_dataset_404s_on_unknown_name() {
let reg = fixture();
let err = reg.delete_dataset("nope").await.unwrap_err();
assert!(err.starts_with("dataset not found"), "error wording: {err}");
}
#[tokio::test]
async fn delete_dataset_removes_all_dupes_with_matching_name() {
// Legacy zombies: direct insert so we can seed duplicates,
// mirroring pre-idempotent-register state. delete_dataset should
// sweep every manifest sharing that name in a single call.
let reg = fixture();
let now = chrono::Utc::now();
for _ in 0..3 {
let m = DatasetManifest {
id: DatasetId::new(),
name: "zombie".into(),
schema_fingerprint: fp("fp"),
objects: vec![obj("z.parquet")],
created_at: now, updated_at: now,
row_count: None, description: String::new(), owner: String::new(),
sensitivity: None, columns: vec![], lineage: None, freshness: None,
tags: vec![], last_embedded_at: None, embedding_stale_since: None,
embedding_refresh_policy: None,
};
reg.datasets.write().await.insert(m.id.clone(), m);
}
assert_eq!(reg.list().await.len(), 3);
let removed = reg.delete_dataset("zombie").await.unwrap();
assert_eq!(removed, 3, "all three manifests removed in one sweep");
assert_eq!(reg.list().await.len(), 0);
}
#[tokio::test]
async fn dedupe_no_duplicates_is_noop() {
let reg = fixture();
reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap();
reg.register("b".into(), fp("f2"), vec![obj("b.parquet")]).await.unwrap();
let report = reg.dedupe_by_name().await;
assert_eq!(report.groups, 0);
assert_eq!(report.removed, 0);
assert_eq!(reg.list().await.len(), 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn concurrent_register_stress_many_workers_one_manifest() {
// Stress variant: fire 32 concurrent registers for the same new
// name across 8 worker threads. Single-manifest invariant must
// hold regardless of how the scheduler interleaves them. This
// catches race regressions the 2-call variant below might miss
// under a forgiving scheduler.
let reg = fixture();
let mut handles = Vec::new();
for i in 0..32 {
let r = reg.clone();
handles.push(tokio::spawn(async move {
r.register(
"stress".into(),
fp("stress-fp"),
vec![obj(&format!("{i}.parquet"))],
).await
}));
}
for h in handles {
h.await.unwrap().expect("each register succeeds (idempotent)");
}
let all = reg.list().await;
let stress_manifests: Vec<_> = all.iter().filter(|d| d.name == "stress").collect();
assert_eq!(stress_manifests.len(), 1,
"32 concurrent registers produced {} manifests for 'stress' — expected 1",
stress_manifests.len());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_register_same_new_name_collapses_to_one_manifest() {
// Pins the TOCTOU fix: without the write-lock held across the
// check→insert sequence, two parallel registers of the same
// previously-unknown name would both see "no existing" and both
// insert. Multi-thread flavor is required to actually exercise
// the race — under single-threaded tokio the scheduler doesn't
// preempt between awaits on the same task's critical section.
// Under the current impl, whichever acquires the write lock
// first creates the manifest; the second observes it and takes
// the idempotent-update branch.
let reg = fixture();
let a = reg.clone();
let b = reg.clone();
let (ra, rb) = tokio::join!(
async move {
a.register("race".into(), fp("shared-fp"), vec![obj("a.parquet")]).await
},
async move {
b.register("race".into(), fp("shared-fp"), vec![obj("b.parquet")]).await
},
);
let m_a = ra.expect("first register succeeds");
let m_b = rb.expect("second register succeeds (same fp = idempotent)");
assert_eq!(m_a.id, m_b.id, "both calls resolve to a single DatasetId");
assert_eq!(reg.list().await.len(), 1, "no duplicate manifest created");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_register_same_name_different_fp_one_wins_one_rejects() {
// Reverse case: concurrent registers with conflicting fingerprints.
// Deterministic outcome: one call's fingerprint gets persisted as
// canonical, the other is rejected on seeing the now-established
// fingerprint. Which one wins is scheduler-dependent — we only
// assert exactly one success and exactly one rejection.
let reg = fixture();
let a = reg.clone();
let b = reg.clone();
let (ra, rb) = tokio::join!(
async move { a.register("race".into(), fp("fp1"), vec![obj("a.parquet")]).await },
async move { b.register("race".into(), fp("fp2"), vec![obj("b.parquet")]).await },
);
let successes = [&ra, &rb].iter().filter(|r| r.is_ok()).count();
let rejections = [&ra, &rb].iter().filter(|r| r.is_err()).count();
assert_eq!(successes, 1, "exactly one register wins");
assert_eq!(rejections, 1, "the other is rejected on fingerprint");
let rejection_msg = [&ra, &rb].iter()
.find_map(|r| r.as_ref().err())
.unwrap();
assert!(rejection_msg.contains("different schema"));
assert_eq!(reg.list().await.len(), 1);
}
#[tokio::test]
async fn dedupe_collapses_dupes_preferring_non_null_row_count() {
let reg = fixture();
// Simulate legacy pre-idempotent-register state by inserting
// manifests with the same `name` directly. Mirrors what happened
// with the 308× successful_playbooks duplication.
let now = chrono::Utc::now();
let old_with_rows = DatasetManifest {
id: DatasetId::new(),
name: "dupes".into(),
schema_fingerprint: fp("f1"),
objects: vec![obj("dupes.parquet")],
created_at: now - chrono::Duration::hours(2),
updated_at: now - chrono::Duration::hours(2),
row_count: Some(42),
description: String::new(),
owner: String::new(),
sensitivity: None,
columns: vec![],
lineage: None,
freshness: None,
tags: vec![],
last_embedded_at: None,
embedding_stale_since: None,
embedding_refresh_policy: None,
};
let newer_no_rows = DatasetManifest {
id: DatasetId::new(),
updated_at: now, // newer, but null row_count
row_count: None,
..old_with_rows.clone()
};
let oldest = DatasetManifest {
id: DatasetId::new(),
updated_at: now - chrono::Duration::hours(5),
row_count: None,
..old_with_rows.clone()
};
let kept_id = old_with_rows.id.clone();
let loser_a = newer_no_rows.id.clone();
let loser_b = oldest.id.clone();
{
let mut d = reg.datasets.write().await;
d.insert(old_with_rows.id.clone(), old_with_rows);
d.insert(newer_no_rows.id.clone(), newer_no_rows);
d.insert(oldest.id.clone(), oldest);
}
assert_eq!(reg.list().await.len(), 3);
let report = reg.dedupe_by_name().await;
assert_eq!(report.groups, 1);
assert_eq!(report.removed, 2);
assert_eq!(report.errors.len(), 0);
assert_eq!(report.kept.len(), 1);
assert_eq!(report.kept[0].name, "dupes");
assert_eq!(report.kept[0].kept_id, kept_id.to_string(),
"should keep the manifest with non-null row_count");
let remaining = reg.list().await;
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].id, kept_id);
assert!(reg.get(&loser_a).await.is_none());
assert!(reg.get(&loser_b).await.is_none());
}
}