use shared::types::{ DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint, ColumnMeta, Lineage, FreshnessContract, Sensitivity, }; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use storaged::ops; use object_store::ObjectStore; /// Partial metadata update — only set fields are applied. #[derive(Debug, Clone, Default, serde::Deserialize)] pub struct MetadataUpdate { pub description: Option, pub owner: Option, pub sensitivity: Option, pub tags: Option>, pub columns: Option>, pub lineage: Option, pub freshness: Option, pub row_count: Option, } const MANIFEST_PREFIX: &str = "_catalog/manifests"; /// In-memory dataset registry backed by manifest persistence in object storage. #[derive(Clone)] pub struct Registry { datasets: Arc>>, store: Arc, } impl Registry { pub fn new(store: Arc) -> Self { Self { datasets: Arc::new(RwLock::new(HashMap::new())), store, } } /// Rebuild in-memory index from persisted manifests on startup. pub async fn rebuild(&self) -> Result { let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?; let mut datasets = self.datasets.write().await; datasets.clear(); for key in &keys { let data = ops::get(&self.store, key).await?; let manifest: DatasetManifest = serde_json::from_slice(&data).map_err(|e| e.to_string())?; datasets.insert(manifest.id.clone(), manifest); } let count = datasets.len(); tracing::info!("catalog rebuilt: {count} datasets loaded"); Ok(count) } /// Register a new dataset. Persists manifest to storage before updating memory. pub async fn register( &self, name: String, schema_fingerprint: SchemaFingerprint, objects: Vec, ) -> Result { let now = chrono::Utc::now(); let manifest = DatasetManifest { id: DatasetId::new(), name, schema_fingerprint, objects, created_at: now, updated_at: now, description: String::new(), owner: String::new(), sensitivity: None, columns: vec![], lineage: None, freshness: None, tags: vec![], row_count: None, }; // Write-ahead: persist before in-memory update let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; let mut datasets = self.datasets.write().await; datasets.insert(manifest.id.clone(), manifest.clone()); tracing::info!("registered dataset: {} ({})", manifest.name, manifest.id); Ok(manifest) } /// Update metadata on an existing dataset (owner, description, tags, sensitivity, etc.) pub async fn update_metadata( &self, name: &str, updates: MetadataUpdate, ) -> Result { let mut datasets = self.datasets.write().await; let manifest = datasets.values_mut() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; if let Some(desc) = updates.description { manifest.description = desc; } if let Some(owner) = updates.owner { manifest.owner = owner; } if let Some(sens) = updates.sensitivity { manifest.sensitivity = Some(sens); } if let Some(tags) = updates.tags { manifest.tags = tags; } if let Some(cols) = updates.columns { manifest.columns = cols; } if let Some(lineage) = updates.lineage { manifest.lineage = Some(lineage); } if let Some(freshness) = updates.freshness { manifest.freshness = Some(freshness); } if let Some(count) = updates.row_count { manifest.row_count = Some(count); } manifest.updated_at = chrono::Utc::now(); // Persist let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; let result = manifest.clone(); Ok(result) } /// Get a dataset by ID. pub async fn get(&self, id: &DatasetId) -> Option { let datasets = self.datasets.read().await; datasets.get(id).cloned() } /// Get a dataset by name. pub async fn get_by_name(&self, name: &str) -> Option { let datasets = self.datasets.read().await; datasets.values().find(|d| d.name == name).cloned() } /// List all datasets. pub async fn list(&self) -> Vec { let datasets = self.datasets.read().await; datasets.values().cloned().collect() } /// Re-read the parquet footer(s) for a dataset and repopulate `row_count` /// and `columns` from reality. Use this to repair manifests whose /// metadata was lost (e.g. migrated from a pre-Phase 10 catalog). /// /// Does NOT touch owner/description/sensitivity/lineage/tags — only /// the structural facts that parquet can tell us authoritatively. /// The existing `schema_fingerprint` is updated if the recomputed one /// differs; a warning is logged so drift is visible. pub async fn resync_from_parquet(&self, name: &str) -> Result { use shared::arrow_helpers::{fingerprint_schema, parquet_to_record_batches}; // Snapshot the target manifest so we don't hold the write lock during IO. let (id, objects, old_fp) = { let datasets = self.datasets.read().await; let m = datasets .values() .find(|d| d.name == name) .ok_or_else(|| format!("dataset not found: {name}"))?; (m.id.clone(), m.objects.clone(), m.schema_fingerprint.clone()) }; if objects.is_empty() { return Err(format!("dataset '{name}' has no object references to resync from")); } let mut total_rows: u64 = 0; let mut first_schema: Option = None; for obj in &objects { let data = ops::get(&self.store, &obj.key).await .map_err(|e| format!("read {}: {e}", obj.key))?; let (schema, batches) = parquet_to_record_batches(&data) .map_err(|e| format!("parse {}: {e}", obj.key))?; let rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum(); total_rows += rows; if first_schema.is_none() { first_schema = Some(schema); } } let schema = first_schema.ok_or("no schema recovered")?; let new_fp = fingerprint_schema(&schema); if new_fp != old_fp { tracing::warn!( "dataset '{}' schema fingerprint drift: {} -> {} (updating to match parquet reality)", name, old_fp.0, new_fp.0, ); } let columns: Vec = schema .fields() .iter() .map(|f| ColumnMeta { name: f.name().clone(), data_type: f.data_type().to_string(), sensitivity: None, description: String::new(), is_pii: false, }) .collect(); // Apply updates. let mut datasets = self.datasets.write().await; let manifest = datasets .get_mut(&id) .ok_or_else(|| format!("dataset disappeared during resync: {name}"))?; manifest.row_count = Some(total_rows); manifest.columns = columns; manifest.schema_fingerprint = new_fp; manifest.updated_at = chrono::Utc::now(); // Persist. let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; tracing::info!("resynced '{name}': row_count={total_rows}, {} columns", manifest.columns.len()); Ok(manifest.clone()) } /// Resync every dataset that currently has a null row_count. /// Returns (successes, failures) where each entry is (name, detail). pub async fn resync_missing(&self) -> (Vec<(String, u64)>, Vec<(String, String)>) { let names: Vec = { let datasets = self.datasets.read().await; datasets .values() .filter(|d| d.row_count.is_none() || d.columns.is_empty()) .map(|d| d.name.clone()) .collect() }; let mut ok = Vec::new(); let mut err = Vec::new(); for name in names { match self.resync_from_parquet(&name).await { Ok(m) => ok.push((name, m.row_count.unwrap_or(0))), Err(e) => err.push((name, e)), } } (ok, err) } /// Add objects to an existing dataset. pub async fn add_objects( &self, id: &DatasetId, new_objects: Vec, ) -> Result { let mut datasets = self.datasets.write().await; let manifest = datasets.get_mut(id).ok_or_else(|| format!("dataset not found: {id}"))?; manifest.objects.extend(new_objects); manifest.updated_at = chrono::Utc::now(); // Persist updated manifest let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id); let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?; ops::put(&self.store, &manifest_key, json.into()).await?; Ok(manifest.clone()) } }