diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index 8113552..707709d 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -1,6 +1,6 @@ use shared::types::{ AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile, - ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone, + ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, SubjectManifest, Tombstone, }; use crate::tombstones::TombstoneStore; @@ -66,16 +66,20 @@ pub struct MetadataUpdate { const MANIFEST_PREFIX: &str = "_catalog/manifests"; const VIEW_PREFIX: &str = "_catalog/views"; +const SUBJECT_PREFIX: &str = "_catalog/subjects"; const PROFILE_PREFIX: &str = "_catalog/profiles"; /// In-memory dataset registry backed by manifest persistence in object storage. /// Also tracks AiViews (Phase D) — safe projections over base datasets. /// And tombstones (Phase E) — soft-delete markers applied at query time. +/// And subject manifests (compliance/access metadata layer) — see +/// docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md. #[derive(Clone)] pub struct Registry { datasets: Arc>>, views: Arc>>, profiles: Arc>>, + subjects: Arc>>, tombstones: TombstoneStore, store: Arc, } @@ -86,6 +90,7 @@ impl Registry { datasets: Arc::new(RwLock::new(HashMap::new())), views: Arc::new(RwLock::new(HashMap::new())), profiles: Arc::new(RwLock::new(HashMap::new())), + subjects: Arc::new(RwLock::new(HashMap::new())), tombstones: TombstoneStore::new(store.clone()), store, } @@ -179,6 +184,28 @@ impl Registry { tracing::info!("catalog: {} model profiles loaded", profiles.len()); } + // Subject manifests (compliance/access metadata layer). + let subject_keys = ops::list(&self.store, Some(SUBJECT_PREFIX)).await.unwrap_or_default(); + let mut subjects = self.subjects.write().await; + subjects.clear(); + for key in &subject_keys { + if !key.ends_with(".json") { continue; } + let data = match ops::get(&self.store, key).await { + Ok(d) => d, + Err(e) => { + tracing::warn!("subject '{key}': read failed: {e}"); + continue; + } + }; + match serde_json::from_slice::(&data) { + Ok(subj) => { subjects.insert(subj.candidate_id.clone(), subj); } + Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"), + } + } + if !subjects.is_empty() { + tracing::info!("catalog: {} subject manifests loaded", subjects.len()); + } + Ok(count) } @@ -686,6 +713,64 @@ impl Registry { Ok(()) } + // --- Subject manifests (compliance/access metadata layer) --- + // Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md + // Mirrors the put_view / get_view / list_views / delete_view shape. + // The audit log per subject is owned separately by SubjectAuditWriter + // (Step 2) — this layer just persists the manifest itself. + + /// Create or replace a subject manifest. Validates that referenced + /// datasets exist in the catalog (fail-fast so dangling references + /// don't accumulate). Persists to `_catalog/subjects/.json`. + pub async fn put_subject(&self, mut subj: SubjectManifest) -> Result { + if subj.candidate_id.is_empty() { + return Err("subject candidate_id is empty".into()); + } + for ds in &subj.datasets { + if self.get_by_name(&ds.name).await.is_none() { + return Err(format!( + "subject '{}' references dataset '{}' which is not in the catalog", + subj.candidate_id, ds.name, + )); + } + } + let now = chrono::Utc::now(); + if subj.created_at.timestamp() == 0 { + subj.created_at = now; + } + subj.updated_at = now; + + let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(&subj.candidate_id)); + let json = serde_json::to_vec_pretty(&subj).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + + let mut subjects = self.subjects.write().await; + subjects.insert(subj.candidate_id.clone(), subj.clone()); + tracing::debug!("subject manifest persisted: {}", subj.candidate_id); + Ok(subj) + } + + pub async fn get_subject(&self, candidate_id: &str) -> Option { + self.subjects.read().await.get(candidate_id).cloned() + } + + pub async fn list_subjects(&self) -> Vec { + self.subjects.read().await.values().cloned().collect() + } + + pub async fn delete_subject(&self, candidate_id: &str) -> Result<(), String> { + let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(candidate_id)); + ops::delete(&self.store, &key).await?; + self.subjects.write().await.remove(candidate_id); + Ok(()) + } + + /// Subject count — convenience for the daily retention sweep + audit + /// dashboards that don't need the full list. + pub async fn subjects_count(&self) -> usize { + self.subjects.read().await.len() + } + /// Remove a dataset from the catalog by name. Deletes the manifest /// from both the in-memory registry and object storage. /// @@ -1027,4 +1112,114 @@ mod tests { assert!(reg.get(&loser_a).await.is_none()); assert!(reg.get(&loser_b).await.is_none()); } + + // --- Subject manifests (Step 1 of subject_manifests_on_catalogd) --- + + fn fixture_subject(id: &str) -> shared::types::SubjectManifest { + use shared::types::{ + BiometricConsent, BiometricConsentStatus, ConsentStatus, + GeneralPiiConsent, SubjectConsent, SubjectManifest, SubjectRetention, + SubjectStatus, SubjectVertical, + }; + SubjectManifest { + schema: "subject_manifest.v1".into(), + candidate_id: id.to_string(), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + status: SubjectStatus::Active, + vertical: SubjectVertical::Unknown, + consent: SubjectConsent { + general_pii: GeneralPiiConsent { + status: ConsentStatus::PendingBackfillReview, + version: String::new(), + given_at: None, + withdrawn_at: None, + }, + biometric: BiometricConsent { + status: BiometricConsentStatus::NeverCollected, + retention_until: None, + }, + }, + retention: SubjectRetention { + general_pii_until: chrono::Utc::now() + chrono::Duration::days(365 * 4), + policy: "4_year_default".into(), + }, + datasets: vec![], + safe_views: vec![], + audit_log_path: String::new(), + audit_log_chain_root: String::new(), + } + } + + #[tokio::test] + async fn put_subject_with_no_dataset_refs_succeeds() { + let reg = fixture(); + let s = fixture_subject("CAND-000001"); + let stored = reg.put_subject(s).await.unwrap(); + assert_eq!(stored.candidate_id, "CAND-000001"); + assert_eq!(reg.subjects_count().await, 1); + } + + #[tokio::test] + async fn put_subject_rejects_dangling_dataset_ref() { + use shared::types::SubjectDatasetRef; + let reg = fixture(); + let mut s = fixture_subject("CAND-000002"); + s.datasets.push(SubjectDatasetRef { + name: "no_such_dataset".into(), + key_column: "candidate_id".into(), + key_value: "CAND-000002".into(), + }); + let err = reg.put_subject(s).await.unwrap_err(); + assert!(err.contains("not in the catalog"), "got: {err}"); + } + + #[tokio::test] + async fn put_subject_with_valid_dataset_ref_succeeds() { + use shared::types::SubjectDatasetRef; + let reg = fixture(); + // Register a dataset first so the subject's reference resolves. + reg.register("workers".into(), fp("w1"), vec![obj("w.parquet")]).await.unwrap(); + + let mut s = fixture_subject("CAND-000003"); + s.datasets.push(SubjectDatasetRef { + name: "workers".into(), + key_column: "candidate_id".into(), + key_value: "CAND-000003".into(), + }); + let stored = reg.put_subject(s).await.unwrap(); + assert_eq!(stored.datasets.len(), 1); + assert_eq!(stored.datasets[0].name, "workers"); + } + + #[tokio::test] + async fn subject_round_trips_through_object_store() { + let store = Arc::new(object_store::memory::InMemory::new()); + let reg = Registry::new(store.clone()); + reg.put_subject(fixture_subject("CAND-RT-001")).await.unwrap(); + + // Open a fresh registry over the same store and load → confirms + // persistence + load_all path actually round-trips the manifest. + let reg2 = Registry::new(store); + reg2.rebuild().await.unwrap(); + let loaded = reg2.get_subject("CAND-RT-001").await; + assert!(loaded.is_some(), "subject not loaded from store"); + assert_eq!(loaded.unwrap().status, shared::types::SubjectStatus::Active); + } + + #[tokio::test] + async fn delete_subject_removes_in_memory_and_persistence() { + let store = Arc::new(object_store::memory::InMemory::new()); + let reg = Registry::new(store.clone()); + reg.put_subject(fixture_subject("CAND-DEL-001")).await.unwrap(); + assert_eq!(reg.subjects_count().await, 1); + + reg.delete_subject("CAND-DEL-001").await.unwrap(); + assert_eq!(reg.subjects_count().await, 0); + + // Fresh registry over same store — confirm it's also gone from persistence. + let reg2 = Registry::new(store); + reg2.rebuild().await.unwrap(); + assert_eq!(reg2.subjects_count().await, 0); + } } diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index c22749a..2d3465a 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -364,3 +364,149 @@ pub enum Redaction { keep_suffix: usize, }, } + +// ─── Subject manifests (compliance/access metadata layer) ──────────── +// Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md +// +// A SubjectManifest is a per-person record threading existing catalogd +// primitives (datasets, views, tombstones) together by subject token. +// Stored at _catalog/subjects/.json. NOT a separate +// daemon — extends catalogd's existing manifest pattern. +// +// The audit log per subject (HMAC-chained JSONL at +// _catalog/subjects/.audit.jsonl) is owned by a separate +// SubjectAuditWriter (Step 2) — not modeled here. + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SubjectStatus { + PendingConsent, + Active, + Withdrawn, + RetentionExpired, + Erased, +} + +/// Vertical drives healthcare-PHI routing. Default Unknown is fail-closed: +/// the gateway treats Unknown as healthcare-equivalent until reclassified +/// (per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §2.1). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SubjectVertical { + Unknown, + General, + Healthcare, + Finance, + Other, +} + +impl Default for SubjectVertical { + fn default() -> Self { SubjectVertical::Unknown } +} + +/// State machine for general-PII consent. BIPA-specific biometric +/// consent is tracked separately on `BiometricConsent`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ConsentStatus { + /// Backfilled from existing data sources before consent was tracked. + /// SHOULD route as if no consent exists until reviewed. + PendingBackfillReview, + /// New subject; awaiting consent UX. + PendingFirstContact, + Given, + Withdrawn, + Expired, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum BiometricConsentStatus { + /// Default. No biometric data exists for this subject. + NeverCollected, + Pending, + Given, + Withdrawn, + Expired, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeneralPiiConsent { + pub status: ConsentStatus, + /// Reference to the consent template version (matches a row in the + /// future consent_versions table). Empty when status is pending_*. + #[serde(default)] + pub version: String, + #[serde(default)] + pub given_at: Option>, + #[serde(default)] + pub withdrawn_at: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BiometricConsent { + pub status: BiometricConsentStatus, + /// BIPA: max 3 years from last interaction. Implementation MUST + /// enforce daily expiration sweep against this field. + #[serde(default)] + pub retention_until: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectConsent { + pub general_pii: GeneralPiiConsent, + pub biometric: BiometricConsent, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectRetention { + pub general_pii_until: chrono::DateTime, + /// Free-text policy reference (e.g. "4_year_default", "client_xyz_contract_2026"). + #[serde(default)] + pub policy: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectDatasetRef { + /// catalogd dataset name (must resolve to an existing manifest). + pub name: String, + /// Column in that dataset that holds the subject's identifier. + pub key_column: String, + /// Specific identifier value (typically same as candidate_id; differs + /// only when a dataset uses a foreign-key with a different format). + pub key_value: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectManifest { + /// Schema version tag — always "subject_manifest.v1" for v1. + #[serde(default = "default_subject_manifest_schema")] + pub schema: String, + /// Subject identifier (the canonical token, e.g. "CAND-000001"). + pub candidate_id: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub status: SubjectStatus, + #[serde(default)] + pub vertical: SubjectVertical, + pub consent: SubjectConsent, + pub retention: SubjectRetention, + /// Datasets that contain rows for this subject. Used by audit-response + /// to know what to project. Each entry MUST reference an existing + /// catalogd dataset manifest (validated on put_subject). + pub datasets: Vec, + /// Names of existing AiViews that safely project this subject's data. + /// Used by service-tier readers (no PII exposure). + #[serde(default)] + pub safe_views: Vec, + /// Path (relative to repo root) of the per-subject audit JSONL. + /// Owned by SubjectAuditWriter (Step 2). Empty until first write. + #[serde(default)] + pub audit_log_path: String, + /// SHA-256 of the most recent HMAC chain checkpoint of the audit log. + /// Empty until first audit row written. + #[serde(default)] + pub audit_log_chain_root: String, +} + +fn default_subject_manifest_schema() -> String { "subject_manifest.v1".into() }