use shared::types::{ AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile, ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone, }; use crate::tombstones::TombstoneStore; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use storaged::ops; use object_store::ObjectStore; /// Make a view name safe for use as an object storage key. /// Allows letters, digits, `_`, `-`, `.`. Anything else becomes `_`. fn sanitize_view_name(name: &str) -> String { name.chars() .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' { c } else { '_' }) .collect() } #[derive(Debug, Clone, Default, serde::Serialize)] pub struct MigrateBucketsReport { pub refs_examined: usize, pub refs_renamed: usize, // legacy "data"/"local" → "primary" pub refs_stamped: usize, // empty → "primary" pub refs_unchanged: usize, // already canonical pub manifests_persisted: usize, } #[derive(Debug, Clone, Default, serde::Serialize)] pub struct DedupeReport { pub groups: usize, pub removed: usize, pub kept: Vec, pub errors: Vec, } #[derive(Debug, Clone, serde::Serialize)] pub struct DedupeKept { pub name: String, pub kept_id: String, pub removed: usize, } #[derive(Debug, Clone, serde::Serialize)] pub struct DedupeError { pub manifest_id: String, pub error: String, } /// Partial metadata update — only set fields are applied. #[derive(Debug, Clone, Default, serde::Deserialize)] pub struct MetadataUpdate { pub description: Option, pub owner: Option, pub sensitivity: Option, pub tags: Option>, pub columns: Option>, pub lineage: Option, pub freshness: Option, pub row_count: Option, // Phase C embedding freshness pub embedding_refresh_policy: Option, } const MANIFEST_PREFIX: &str = "_catalog/manifests"; const VIEW_PREFIX: &str = "_catalog/views"; const PROFILE_PREFIX: &str = "_catalog/profiles"; /// In-memory dataset registry backed by manifest persistence in object storage. /// Also tracks AiViews (Phase D) — safe projections over base datasets. /// And tombstones (Phase E) — soft-delete markers applied at query time. #[derive(Clone)] pub struct Registry { datasets: Arc>>, views: Arc>>, profiles: Arc>>, tombstones: TombstoneStore, store: Arc, } impl Registry { pub fn new(store: Arc) -> Self { Self { datasets: Arc::new(RwLock::new(HashMap::new())), views: Arc::new(RwLock::new(HashMap::new())), profiles: Arc::new(RwLock::new(HashMap::new())), tombstones: TombstoneStore::new(store.clone()), store, } } pub fn tombstones(&self) -> &TombstoneStore { &self.tombstones } /// Add a tombstone for a dataset row. Validates that the dataset /// exists. Idempotent at the endpoint layer — callers dedup if needed. pub async fn add_tombstone( &self, dataset: &str, row_key_column: &str, row_key_value: &str, actor: &str, reason: &str, ) -> Result { if self.get_by_name(dataset).await.is_none() { return Err(format!("dataset not found: {dataset}")); } let ts = Tombstone { dataset: dataset.to_string(), row_key_column: row_key_column.to_string(), row_key_value: row_key_value.to_string(), deleted_at: chrono::Utc::now(), actor: actor.to_string(), reason: reason.to_string(), }; self.tombstones.append(&ts).await?; Ok(ts) } pub async fn list_tombstones(&self, dataset: &str) -> Result, String> { self.tombstones.list(dataset).await } /// Rebuild in-memory index from persisted manifests + views on startup. pub async fn rebuild(&self) -> Result { let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?; let mut datasets = self.datasets.write().await; datasets.clear(); for key in &keys { let data = ops::get(&self.store, key).await?; let manifest: DatasetManifest = serde_json::from_slice(&data).map_err(|e| e.to_string())?; datasets.insert(manifest.id.clone(), manifest); } let count = datasets.len(); tracing::info!("catalog rebuilt: {count} datasets loaded"); // Phase D: load AiView definitions alongside manifests. let view_keys = ops::list(&self.store, Some(VIEW_PREFIX)).await.unwrap_or_default(); let mut views = self.views.write().await; views.clear(); for key in &view_keys { if !key.ends_with(".json") { continue; } let data = match ops::get(&self.store, key).await { Ok(d) => d, Err(e) => { tracing::warn!("view '{key}': read failed: {e}"); continue; } }; match serde_json::from_slice::(&data) { Ok(view) => { views.insert(view.name.clone(), view); } Err(e) => tracing::warn!("view '{key}': parse failed: {e}"), } } if !views.is_empty() { tracing::info!("catalog: {} views loaded", views.len()); } // Phase 17: load model profiles. let profile_keys = ops::list(&self.store, Some(PROFILE_PREFIX)).await.unwrap_or_default(); let mut profiles = self.profiles.write().await; profiles.clear(); for key in &profile_keys { if !key.ends_with(".json") { continue; } let data = match ops::get(&self.store, key).await { Ok(d) => d, Err(e) => { tracing::warn!("profile '{key}': read failed: {e}"); continue; } }; match serde_json::from_slice::(&data) { Ok(p) => { profiles.insert(p.id.clone(), p); } Err(e) => tracing::warn!("profile '{key}': parse failed: {e}"), } } if !profiles.is_empty() { tracing::info!("catalog: {} model profiles loaded", profiles.len()); } Ok(count) } // --- Phase 17: Model profiles --- /// Create or replace a model profile. Validates id slug + ensures /// every bound_dataset exists (as either a raw dataset or an AiView). pub async fn put_profile(&self, profile: ModelProfile) -> Result { if profile.id.is_empty() { return Err("profile id is empty".into()); } if !profile.id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { return Err(format!( "profile id '{}' must be alphanumeric + '-' or '_' only", profile.id, )); } for binding in &profile.bound_datasets { let exists = self.get_by_name(binding).await.is_some() || self.get_view(binding).await.is_some(); if !exists { return Err(format!( "bound dataset '{}' not found as dataset or view", binding, )); } } let key = format!("{PROFILE_PREFIX}/{}.json", profile.id); let json = serde_json::to_vec_pretty(&profile).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; let mut profiles = self.profiles.write().await; profiles.insert(profile.id.clone(), profile.clone()); tracing::info!( "profile registered: {} -> ollama={} bindings={:?}", profile.id, profile.ollama_name, profile.bound_datasets, ); Ok(profile) } pub async fn get_profile(&self, id: &str) -> Option { self.profiles.read().await.get(id).cloned() } pub async fn list_profiles(&self) -> Vec { self.profiles.read().await.values().cloned().collect() } pub async fn delete_profile(&self, id: &str) -> Result<(), String> { let key = format!("{PROFILE_PREFIX}/{id}.json"); ops::delete(&self.store, &key).await?; self.profiles.write().await.remove(id); Ok(()) } /// Register a dataset. Idempotent on `name`: /// - No existing dataset by this name → create new manifest. /// - Exists with same schema fingerprint → update objects + `updated_at` /// in place (re-ingest of identical-shape data). /// - Exists with different schema fingerprint → reject as `Err` /// (callers map to HTTP 409 / gRPC FAILED_PRECONDITION). Schema /// evolution is not yet exposed as an HTTP endpoint; callers must /// register under a new name or delete the existing manifest first. /// /// Concurrency: the write lock is held across the storage write. That /// serializes concurrent registers but is correctness-mandatory — /// dropping the lock across the check→insert sequence creates a TOCTOU /// window where two callers with the same name both see "no existing" /// and both insert, reintroducing the duplicate-manifest bug this /// function exists to prevent. Register is a metadata-only hop on the /// ingest path, so the serialization cost is acceptable. Precedent: /// `update_metadata` also holds the write lock across its I/O. /// /// Legacy state: if multiple manifests with the same name already exist /// (from before this was idempotent), the most recently updated one is /// treated as canonical. Run `dedupe_by_name` to clean up the rest. pub async fn register( &self, name: String, schema_fingerprint: SchemaFingerprint, objects: Vec, ) -> Result { let mut datasets = self.datasets.write().await; let existing = datasets .values() .filter(|d| d.name == name) .max_by_key(|d| d.updated_at) .cloned(); if let Some(mut manifest) = existing { if manifest.schema_fingerprint != schema_fingerprint { return Err(format!( "dataset '{}' already exists with a different schema \ (existing fingerprint: {}, new: {}). Schema-evolution \ migration is not yet exposed as an HTTP endpoint; \ register under a new name or delete the existing \ manifest first.", name, manifest.schema_fingerprint.0, schema_fingerprint.0, )); } manifest.objects = objects; manifest.updated_at = chrono::Utc::now(); let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; datasets.insert(manifest.id.clone(), manifest.clone()); tracing::info!("re-registered (idempotent): {} ({})", manifest.name, manifest.id); return Ok(manifest); } let now = chrono::Utc::now(); let manifest = DatasetManifest { id: DatasetId::new(), name, schema_fingerprint, objects, created_at: now, updated_at: now, description: String::new(), owner: String::new(), sensitivity: None, columns: vec![], lineage: None, freshness: None, tags: vec![], row_count: None, last_embedded_at: None, embedding_stale_since: None, embedding_refresh_policy: None, }; let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; datasets.insert(manifest.id.clone(), manifest.clone()); tracing::info!("registered dataset: {} ({})", manifest.name, manifest.id); Ok(manifest) } /// Update metadata on an existing dataset (owner, description, tags, sensitivity, etc.) pub async fn update_metadata( &self, name: &str, updates: MetadataUpdate, ) -> Result { let mut datasets = self.datasets.write().await; let manifest = datasets.values_mut() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; if let Some(desc) = updates.description { manifest.description = desc; } if let Some(owner) = updates.owner { manifest.owner = owner; } if let Some(sens) = updates.sensitivity { manifest.sensitivity = Some(sens); } if let Some(tags) = updates.tags { manifest.tags = tags; } if let Some(cols) = updates.columns { manifest.columns = cols; } if let Some(lineage) = updates.lineage { manifest.lineage = Some(lineage); } if let Some(freshness) = updates.freshness { manifest.freshness = Some(freshness); } if let Some(count) = updates.row_count { manifest.row_count = Some(count); } if let Some(policy) = updates.embedding_refresh_policy { manifest.embedding_refresh_policy = Some(policy); } manifest.updated_at = chrono::Utc::now(); // Persist let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; let result = manifest.clone(); Ok(result) } /// Get a dataset by ID. pub async fn get(&self, id: &DatasetId) -> Option { let datasets = self.datasets.read().await; datasets.get(id).cloned() } /// Get a dataset by name. pub async fn get_by_name(&self, name: &str) -> Option { let datasets = self.datasets.read().await; datasets.values().find(|d| d.name == name).cloned() } /// List all datasets. pub async fn list(&self) -> Vec { let datasets = self.datasets.read().await; datasets.values().cloned().collect() } /// Re-read the parquet footer(s) for a dataset and repopulate `row_count` /// and `columns` from reality. Use this to repair manifests whose /// metadata was lost (e.g. migrated from a pre-Phase 10 catalog). /// /// Does NOT touch owner/description/sensitivity/lineage/tags — only /// the structural facts that parquet can tell us authoritatively. /// The existing `schema_fingerprint` is updated if the recomputed one /// differs; a warning is logged so drift is visible. pub async fn resync_from_parquet(&self, name: &str) -> Result { use shared::arrow_helpers::{fingerprint_schema, parquet_to_record_batches}; // Snapshot the target manifest so we don't hold the write lock during IO. let (id, objects, old_fp) = { let datasets = self.datasets.read().await; let m = datasets .values() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; (m.id.clone(), m.objects.clone(), m.schema_fingerprint.clone()) }; if objects.is_empty() { return Err(format!("dataset '{name}' has no object references to resync from")); } let mut total_rows: u64 = 0; let mut first_schema: Option = None; for obj in &objects { let data = ops::get(&self.store, &obj.key).await .map_err(|e| format!("read {}: {e}", obj.key))?; let (schema, batches) = parquet_to_record_batches(&data) .map_err(|e| format!("parse {}: {e}", obj.key))?; let rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum(); total_rows += rows; if first_schema.is_none() { first_schema = Some(schema); } } let schema = first_schema.ok_or("no schema recovered")?; let new_fp = fingerprint_schema(&schema); if new_fp != old_fp { tracing::warn!( "dataset '{}' schema fingerprint drift: {} -> {} (updating to match parquet reality)", name, old_fp.0, new_fp.0, ); } let columns: Vec = schema .fields() .iter() .map(|f| ColumnMeta { name: f.name().clone(), data_type: f.data_type().to_string(), sensitivity: None, description: String::new(), is_pii: false, }) .collect(); // Apply updates. let mut datasets = self.datasets.write().await; let manifest = datasets .get_mut(&id) .ok_or_else(|| format!("dataset disappeared during resync: {name}"))?; manifest.row_count = Some(total_rows); manifest.columns = columns; manifest.schema_fingerprint = new_fp; manifest.updated_at = chrono::Utc::now(); // Persist. let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; tracing::info!("resynced '{name}': row_count={total_rows}, {} columns", manifest.columns.len()); Ok(manifest.clone()) } /// Collapse duplicate manifests that share a `name`. Winner per group: /// 1. Prefer a manifest with a non-null `row_count` (already resynced). /// 2. Break ties by newest `updated_at`. /// Losers are removed from the in-memory registry AND their manifest /// JSON is deleted from object storage. Datasets with a single manifest /// are untouched. Parquet data files are never touched — only catalog /// metadata. See `register` for the prevention side of this fix. /// /// Operator-only. Do not run while ingest is active: a concurrent /// `register` between the snapshot and the delete sweep can create a /// new manifest under a name we just deduped, producing a transient /// count of 2 until the next dedupe run. Safe outside ingest windows. pub async fn dedupe_by_name(&self) -> DedupeReport { let groups: Vec<(String, Vec)> = { let datasets = self.datasets.read().await; let mut by_name: HashMap> = HashMap::new(); for m in datasets.values() { by_name.entry(m.name.clone()).or_default().push(m.clone()); } by_name.into_iter().filter(|(_, v)| v.len() > 1).collect() }; let mut report = DedupeReport::default(); let mut to_delete: Vec = Vec::new(); for (name, mut manifests) in groups { report.groups += 1; manifests.sort_by(|a, b| { b.row_count.is_some().cmp(&a.row_count.is_some()) .then(b.updated_at.cmp(&a.updated_at)) }); let winner = &manifests[0]; report.kept.push(DedupeKept { name: name.clone(), kept_id: winner.id.to_string(), removed: manifests.len() - 1, }); for loser in &manifests[1..] { to_delete.push(loser.id.clone()); } } { let mut datasets = self.datasets.write().await; for id in &to_delete { datasets.remove(id); } } for id in &to_delete { let key = format!("{MANIFEST_PREFIX}/{}.json", id); match ops::delete(&self.store, &key).await { Ok(_) => report.removed += 1, Err(e) => report.errors.push(DedupeError { manifest_id: id.to_string(), error: e, }), } } tracing::info!( "dedupe: {} groups collapsed, {} manifests removed, {} errors", report.groups, report.removed, report.errors.len(), ); report } /// Resync every dataset that currently has a null row_count. /// Returns (successes, failures) where each entry is (name, detail). pub async fn resync_missing(&self) -> (Vec<(String, u64)>, Vec<(String, String)>) { let names: Vec = { let datasets = self.datasets.read().await; datasets .values() .filter(|d| d.row_count.is_none() || d.columns.is_empty()) .map(|d| d.name.clone()) .collect() }; let mut ok = Vec::new(); let mut err = Vec::new(); for name in names { match self.resync_from_parquet(&name).await { Ok(m) => ok.push((name, m.row_count.unwrap_or(0))), Err(e) => err.push((name, e)), } } (ok, err) } /// Mark a dataset's embeddings as stale (row-level data has been written /// since the last embedding refresh). Idempotent — setting stale when /// already stale is a no-op. Only marks stale if the dataset has been /// embedded before — a never-embedded dataset doesn't need a stale flag /// (it just needs an initial index build). Called from the ingest path. pub async fn mark_embeddings_stale(&self, name: &str) -> Result<(), String> { let mut datasets = self.datasets.write().await; let manifest = datasets .values_mut() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; if manifest.last_embedded_at.is_none() { return Ok(()); // never embedded -> no stale semantics yet } if manifest.embedding_stale_since.is_none() { manifest.embedding_stale_since = Some(chrono::Utc::now()); manifest.updated_at = chrono::Utc::now(); let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; tracing::info!("marked embeddings stale for dataset '{name}'"); } Ok(()) } /// Clear the stale marker and set `last_embedded_at = now`. /// Called by the embedding refresh pipeline once it finishes. pub async fn clear_embeddings_stale(&self, name: &str) -> Result<(), String> { let mut datasets = self.datasets.write().await; let manifest = datasets .values_mut() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; let now = chrono::Utc::now(); manifest.embedding_stale_since = None; manifest.last_embedded_at = Some(now); manifest.updated_at = now; let key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; Ok(()) } /// Federation layer 2: stamp `bucket = "primary"` on every ObjectRef /// whose `bucket` field is empty or matches a legacy value (`"data"`, /// `"local"`). One-shot migration; re-running is a safe no-op once /// every ref is canonical. pub async fn migrate_buckets_to_primary(&self) -> Result { let mut datasets = self.datasets.write().await; let mut report = MigrateBucketsReport::default(); let mut to_persist: Vec<(DatasetId, DatasetManifest)> = Vec::new(); for manifest in datasets.values_mut() { let mut changed = false; for obj in manifest.objects.iter_mut() { report.refs_examined += 1; if obj.bucket.is_empty() { obj.bucket = "primary".to_string(); report.refs_stamped += 1; changed = true; } else if obj.bucket == "data" || obj.bucket == "local" { obj.bucket = "primary".to_string(); report.refs_renamed += 1; changed = true; } else { report.refs_unchanged += 1; } } if changed { manifest.updated_at = chrono::Utc::now(); to_persist.push((manifest.id.clone(), manifest.clone())); } } // Persist updated manifests after we've finished mutating the map. for (id, manifest) in to_persist { let key = format!("{MANIFEST_PREFIX}/{}.json", id); let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; report.manifests_persisted += 1; } tracing::info!( "bucket migration: examined {} refs, renamed {}, stamped {}, unchanged {}, persisted {} manifests", report.refs_examined, report.refs_renamed, report.refs_stamped, report.refs_unchanged, report.manifests_persisted, ); Ok(report) } // --- Phase D: AI-safe views --- /// Create or replace a named view. Validates that the base dataset /// exists and that the column whitelist is non-empty. Persists to /// `_catalog/views/{name}.json`. pub async fn put_view(&self, mut view: AiView) -> Result { if view.name.is_empty() { return Err("view name is empty".into()); } if view.columns.is_empty() { return Err("view must whitelist at least one column".into()); } // Base dataset must exist (read-side will fail anyway, fail-fast here // so operators don't end up with dangling views). if self.get_by_name(&view.base_dataset).await.is_none() { return Err(format!( "base dataset '{}' not found in catalog", view.base_dataset )); } if view.created_at.timestamp() == 0 { view.created_at = chrono::Utc::now(); } let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(&view.name)); let json = serde_json::to_vec_pretty(&view).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; let mut views = self.views.write().await; views.insert(view.name.clone(), view.clone()); tracing::info!("view registered: {} over '{}'", view.name, view.base_dataset); Ok(view) } pub async fn get_view(&self, name: &str) -> Option { self.views.read().await.get(name).cloned() } pub async fn list_views(&self) -> Vec { self.views.read().await.values().cloned().collect() } pub async fn delete_view(&self, name: &str) -> Result<(), String> { let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(name)); ops::delete(&self.store, &key).await?; self.views.write().await.remove(name); Ok(()) } /// Remove a dataset from the catalog by name. Deletes the manifest /// from both the in-memory registry and object storage. /// /// Scope is metadata-only: the underlying parquet files, vector /// indexes, tombstones, trial journals, and AiViews that reference /// this dataset are NOT touched. Caller is responsible for any /// cascade cleanup. This mirrors how a DROP TABLE in a typical /// warehouse separates "forget about this dataset" from "actually /// reclaim the bytes". /// /// Returns Ok(()) when the dataset existed and was removed; an Err /// with "dataset not found" when no manifest by that name exists. /// Legacy state with duplicate names: all matching manifests are /// removed (effectively a post-hoc dedupe on that name). pub async fn delete_dataset(&self, name: &str) -> Result { let ids_to_remove: Vec = { let datasets = self.datasets.read().await; datasets.values().filter(|d| d.name == name).map(|d| d.id.clone()).collect() }; if ids_to_remove.is_empty() { return Err(format!("dataset not found: {name}")); } for id in &ids_to_remove { let key = format!("{MANIFEST_PREFIX}/{}.json", id); if let Err(e) = ops::delete(&self.store, &key).await { // Storage delete failed — log and keep going. The in-memory // remove below is still correct; on restart, the missing // storage object just means nothing to rehydrate. tracing::warn!("delete_dataset '{name}': storage delete of {key} failed: {e}"); } } let mut datasets = self.datasets.write().await; for id in &ids_to_remove { datasets.remove(id); } tracing::info!("deleted dataset '{}' ({} manifest(s))", name, ids_to_remove.len()); Ok(ids_to_remove.len()) } /// List datasets whose `embedding_stale_since` is set — they need a refresh. pub async fn stale_datasets(&self) -> Vec { let datasets = self.datasets.read().await; datasets .values() .filter(|d| d.embedding_stale_since.is_some()) .cloned() .collect() } /// Add objects to an existing dataset. pub async fn add_objects( &self, id: &DatasetId, new_objects: Vec, ) -> Result { let mut datasets = self.datasets.write().await; let manifest = datasets.get_mut(id).ok_or_else(|| format!("dataset not found: {id}"))?; manifest.objects.extend(new_objects); manifest.updated_at = chrono::Utc::now(); // Persist updated manifest let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; Ok(manifest.clone()) } } #[cfg(test)] mod tests { use super::*; use object_store::memory::InMemory; fn fixture() -> Registry { Registry::new(Arc::new(InMemory::new())) } fn fp(s: &str) -> SchemaFingerprint { SchemaFingerprint(s.to_string()) } fn obj(key: &str) -> ObjectRef { ObjectRef { bucket: "primary".to_string(), key: key.to_string(), size_bytes: 100, created_at: chrono::Utc::now(), } } #[tokio::test] async fn fresh_register_creates_new_manifest() { let reg = fixture(); let m = reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap(); assert_eq!(m.name, "a"); assert_eq!(m.schema_fingerprint, fp("f1")); assert_eq!(reg.list().await.len(), 1); assert_eq!(reg.get_by_name("a").await.unwrap().id, m.id); } #[tokio::test] async fn re_register_same_fingerprint_is_idempotent() { let reg = fixture(); let first = reg.register("a".into(), fp("f1"), vec![obj("old.parquet")]).await.unwrap(); let second = reg.register("a".into(), fp("f1"), vec![obj("new.parquet")]).await.unwrap(); assert_eq!(first.id, second.id, "same name+fp should reuse ID"); assert_eq!(reg.list().await.len(), 1, "no duplicate manifest created"); let fetched = reg.get_by_name("a").await.unwrap(); assert_eq!(fetched.objects[0].key, "new.parquet", "objects replaced"); assert!(fetched.updated_at >= first.updated_at, "updated_at bumped"); } #[tokio::test] async fn re_register_different_fingerprint_rejects() { let reg = fixture(); reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap(); let err = reg.register("a".into(), fp("f2"), vec![obj("a.parquet")]).await.unwrap_err(); assert!(err.contains("different schema"), "error mentions schema drift: {err}"); assert_eq!(reg.list().await.len(), 1, "failed register did not mutate state"); assert_eq!(reg.get_by_name("a").await.unwrap().schema_fingerprint, fp("f1")); } #[tokio::test] async fn delete_dataset_removes_manifest_and_in_memory_entry() { let reg = fixture(); reg.register("to_delete".into(), fp("f1"), vec![obj("d.parquet")]).await.unwrap(); reg.register("keepme".into(), fp("f2"), vec![obj("k.parquet")]).await.unwrap(); assert_eq!(reg.list().await.len(), 2); let removed = reg.delete_dataset("to_delete").await.unwrap(); assert_eq!(removed, 1, "one manifest removed"); assert!(reg.get_by_name("to_delete").await.is_none()); assert!(reg.get_by_name("keepme").await.is_some(), "unrelated dataset untouched"); assert_eq!(reg.list().await.len(), 1); } #[tokio::test] async fn delete_dataset_404s_on_unknown_name() { let reg = fixture(); let err = reg.delete_dataset("nope").await.unwrap_err(); assert!(err.starts_with("dataset not found"), "error wording: {err}"); } #[tokio::test] async fn delete_dataset_removes_all_dupes_with_matching_name() { // Legacy zombies: direct insert so we can seed duplicates, // mirroring pre-idempotent-register state. delete_dataset should // sweep every manifest sharing that name in a single call. let reg = fixture(); let now = chrono::Utc::now(); for _ in 0..3 { let m = DatasetManifest { id: DatasetId::new(), name: "zombie".into(), schema_fingerprint: fp("fp"), objects: vec![obj("z.parquet")], created_at: now, updated_at: now, row_count: None, description: String::new(), owner: String::new(), sensitivity: None, columns: vec![], lineage: None, freshness: None, tags: vec![], last_embedded_at: None, embedding_stale_since: None, embedding_refresh_policy: None, }; reg.datasets.write().await.insert(m.id.clone(), m); } assert_eq!(reg.list().await.len(), 3); let removed = reg.delete_dataset("zombie").await.unwrap(); assert_eq!(removed, 3, "all three manifests removed in one sweep"); assert_eq!(reg.list().await.len(), 0); } #[tokio::test] async fn dedupe_no_duplicates_is_noop() { let reg = fixture(); reg.register("a".into(), fp("f1"), vec![obj("a.parquet")]).await.unwrap(); reg.register("b".into(), fp("f2"), vec![obj("b.parquet")]).await.unwrap(); let report = reg.dedupe_by_name().await; assert_eq!(report.groups, 0); assert_eq!(report.removed, 0); assert_eq!(reg.list().await.len(), 2); } #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn concurrent_register_stress_many_workers_one_manifest() { // Stress variant: fire 32 concurrent registers for the same new // name across 8 worker threads. Single-manifest invariant must // hold regardless of how the scheduler interleaves them. This // catches race regressions the 2-call variant below might miss // under a forgiving scheduler. let reg = fixture(); let mut handles = Vec::new(); for i in 0..32 { let r = reg.clone(); handles.push(tokio::spawn(async move { r.register( "stress".into(), fp("stress-fp"), vec![obj(&format!("{i}.parquet"))], ).await })); } for h in handles { h.await.unwrap().expect("each register succeeds (idempotent)"); } let all = reg.list().await; let stress_manifests: Vec<_> = all.iter().filter(|d| d.name == "stress").collect(); assert_eq!(stress_manifests.len(), 1, "32 concurrent registers produced {} manifests for 'stress' — expected 1", stress_manifests.len()); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn concurrent_register_same_new_name_collapses_to_one_manifest() { // Pins the TOCTOU fix: without the write-lock held across the // check→insert sequence, two parallel registers of the same // previously-unknown name would both see "no existing" and both // insert. Multi-thread flavor is required to actually exercise // the race — under single-threaded tokio the scheduler doesn't // preempt between awaits on the same task's critical section. // Under the current impl, whichever acquires the write lock // first creates the manifest; the second observes it and takes // the idempotent-update branch. let reg = fixture(); let a = reg.clone(); let b = reg.clone(); let (ra, rb) = tokio::join!( async move { a.register("race".into(), fp("shared-fp"), vec![obj("a.parquet")]).await }, async move { b.register("race".into(), fp("shared-fp"), vec![obj("b.parquet")]).await }, ); let m_a = ra.expect("first register succeeds"); let m_b = rb.expect("second register succeeds (same fp = idempotent)"); assert_eq!(m_a.id, m_b.id, "both calls resolve to a single DatasetId"); assert_eq!(reg.list().await.len(), 1, "no duplicate manifest created"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn concurrent_register_same_name_different_fp_one_wins_one_rejects() { // Reverse case: concurrent registers with conflicting fingerprints. // Deterministic outcome: one call's fingerprint gets persisted as // canonical, the other is rejected on seeing the now-established // fingerprint. Which one wins is scheduler-dependent — we only // assert exactly one success and exactly one rejection. let reg = fixture(); let a = reg.clone(); let b = reg.clone(); let (ra, rb) = tokio::join!( async move { a.register("race".into(), fp("fp1"), vec![obj("a.parquet")]).await }, async move { b.register("race".into(), fp("fp2"), vec![obj("b.parquet")]).await }, ); let successes = [&ra, &rb].iter().filter(|r| r.is_ok()).count(); let rejections = [&ra, &rb].iter().filter(|r| r.is_err()).count(); assert_eq!(successes, 1, "exactly one register wins"); assert_eq!(rejections, 1, "the other is rejected on fingerprint"); let rejection_msg = [&ra, &rb].iter() .find_map(|r| r.as_ref().err()) .unwrap(); assert!(rejection_msg.contains("different schema")); assert_eq!(reg.list().await.len(), 1); } #[tokio::test] async fn dedupe_collapses_dupes_preferring_non_null_row_count() { let reg = fixture(); // Simulate legacy pre-idempotent-register state by inserting // manifests with the same `name` directly. Mirrors what happened // with the 308× successful_playbooks duplication. let now = chrono::Utc::now(); let old_with_rows = DatasetManifest { id: DatasetId::new(), name: "dupes".into(), schema_fingerprint: fp("f1"), objects: vec![obj("dupes.parquet")], created_at: now - chrono::Duration::hours(2), updated_at: now - chrono::Duration::hours(2), row_count: Some(42), description: String::new(), owner: String::new(), sensitivity: None, columns: vec![], lineage: None, freshness: None, tags: vec![], last_embedded_at: None, embedding_stale_since: None, embedding_refresh_policy: None, }; let newer_no_rows = DatasetManifest { id: DatasetId::new(), updated_at: now, // newer, but null row_count row_count: None, ..old_with_rows.clone() }; let oldest = DatasetManifest { id: DatasetId::new(), updated_at: now - chrono::Duration::hours(5), row_count: None, ..old_with_rows.clone() }; let kept_id = old_with_rows.id.clone(); let loser_a = newer_no_rows.id.clone(); let loser_b = oldest.id.clone(); { let mut d = reg.datasets.write().await; d.insert(old_with_rows.id.clone(), old_with_rows); d.insert(newer_no_rows.id.clone(), newer_no_rows); d.insert(oldest.id.clone(), oldest); } assert_eq!(reg.list().await.len(), 3); let report = reg.dedupe_by_name().await; assert_eq!(report.groups, 1); assert_eq!(report.removed, 2); assert_eq!(report.errors.len(), 0); assert_eq!(report.kept.len(), 1); assert_eq!(report.kept[0].name, "dupes"); assert_eq!(report.kept[0].kept_id, kept_id.to_string(), "should keep the manifest with non-null row_count"); let remaining = reg.list().await; assert_eq!(remaining.len(), 1); assert_eq!(remaining[0].id, kept_id); assert!(reg.get(&loser_a).await.is_none()); assert!(reg.get(&loser_b).await.is_none()); } }