catalogd: Step 1 — SubjectManifest type + Registry CRUD

Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md Step 1.
Mirrors the existing AiView put/get/list/delete pattern. NOT a separate
daemon, NOT new infrastructure — extends catalogd's manifest layer with
a fourth manifest type (subject) alongside dataset/view/tombstone/profile.

shared/types.rs additions:
- SubjectManifest (the wire format from spec §2)
- SubjectStatus enum: pending_consent | active | withdrawn |
  retention_expired | erased
- SubjectVertical enum: unknown | general | healthcare | finance | other
  (default = Unknown for fail-closed routing per spec §2.1)
- ConsentStatus enum: pending_backfill_review | pending_first_contact |
  given | withdrawn | expired
- BiometricConsentStatus enum: never_collected | pending | given |
  withdrawn | expired
- GeneralPiiConsent + BiometricConsent + SubjectConsent
- SubjectRetention (general_pii_until + policy)
- SubjectDatasetRef (name + key_column + key_value pointing at existing
  catalogd dataset manifests)

catalogd/registry.rs additions:
- subjects: Arc<RwLock<HashMap<String, SubjectManifest>>> field on Registry
- put_subject() — validates dataset refs, persists to
  _catalog/subjects/<id>.json, updates in-memory cache
- get_subject() / list_subjects() / delete_subject() / subjects_count()
- rebuild() now loads subject manifests at startup alongside views +
  profiles + tombstones

Tests (5/5 passing):
- put_subject_with_no_dataset_refs_succeeds
- put_subject_rejects_dangling_dataset_ref (validation works)
- put_subject_with_valid_dataset_ref_succeeds
- subject_round_trips_through_object_store (persistence works)
- delete_subject_removes_in_memory_and_persistence

NOT in this commit (future steps):
- Step 2: SubjectAuditWriter with HMAC chain
- Step 3: Backfill ETL from workers_500k.parquet
- Steps 4-5: Wire gateway tool registry + validator to write audit rows
- Step 6: /audit/subject/{id} HTTP endpoint
- Step 7: Daily retention sweep

cargo check --workspace clean. cargo test -p catalogd subject 5/5 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 03:13:08 -05:00
parent ed1fcd3c26
commit d25990982c
2 changed files with 342 additions and 1 deletions

View File

@ -1,6 +1,6 @@
use shared::types::{
AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ModelProfile,
ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone,
ObjectRef, RefreshPolicy, SchemaFingerprint, Sensitivity, SubjectManifest, Tombstone,
};
use crate::tombstones::TombstoneStore;
@ -66,16 +66,20 @@ pub struct MetadataUpdate {
const MANIFEST_PREFIX: &str = "_catalog/manifests";
const VIEW_PREFIX: &str = "_catalog/views";
const SUBJECT_PREFIX: &str = "_catalog/subjects";
const PROFILE_PREFIX: &str = "_catalog/profiles";
/// In-memory dataset registry backed by manifest persistence in object storage.
/// Also tracks AiViews (Phase D) — safe projections over base datasets.
/// And tombstones (Phase E) — soft-delete markers applied at query time.
/// And subject manifests (compliance/access metadata layer) — see
/// docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md.
#[derive(Clone)]
pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
views: Arc<RwLock<HashMap<String, AiView>>>,
profiles: Arc<RwLock<HashMap<String, ModelProfile>>>,
subjects: Arc<RwLock<HashMap<String, SubjectManifest>>>,
tombstones: TombstoneStore,
store: Arc<dyn ObjectStore>,
}
@ -86,6 +90,7 @@ impl Registry {
datasets: Arc::new(RwLock::new(HashMap::new())),
views: Arc::new(RwLock::new(HashMap::new())),
profiles: Arc::new(RwLock::new(HashMap::new())),
subjects: Arc::new(RwLock::new(HashMap::new())),
tombstones: TombstoneStore::new(store.clone()),
store,
}
@ -179,6 +184,28 @@ impl Registry {
tracing::info!("catalog: {} model profiles loaded", profiles.len());
}
// Subject manifests (compliance/access metadata layer).
let subject_keys = ops::list(&self.store, Some(SUBJECT_PREFIX)).await.unwrap_or_default();
let mut subjects = self.subjects.write().await;
subjects.clear();
for key in &subject_keys {
if !key.ends_with(".json") { continue; }
let data = match ops::get(&self.store, key).await {
Ok(d) => d,
Err(e) => {
tracing::warn!("subject '{key}': read failed: {e}");
continue;
}
};
match serde_json::from_slice::<SubjectManifest>(&data) {
Ok(subj) => { subjects.insert(subj.candidate_id.clone(), subj); }
Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"),
}
}
if !subjects.is_empty() {
tracing::info!("catalog: {} subject manifests loaded", subjects.len());
}
Ok(count)
}
@ -686,6 +713,64 @@ impl Registry {
Ok(())
}
// --- Subject manifests (compliance/access metadata layer) ---
// Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md
// Mirrors the put_view / get_view / list_views / delete_view shape.
// The audit log per subject is owned separately by SubjectAuditWriter
// (Step 2) — this layer just persists the manifest itself.
/// Create or replace a subject manifest. Validates that referenced
/// datasets exist in the catalog (fail-fast so dangling references
/// don't accumulate). Persists to `_catalog/subjects/<id>.json`.
pub async fn put_subject(&self, mut subj: SubjectManifest) -> Result<SubjectManifest, String> {
if subj.candidate_id.is_empty() {
return Err("subject candidate_id is empty".into());
}
for ds in &subj.datasets {
if self.get_by_name(&ds.name).await.is_none() {
return Err(format!(
"subject '{}' references dataset '{}' which is not in the catalog",
subj.candidate_id, ds.name,
));
}
}
let now = chrono::Utc::now();
if subj.created_at.timestamp() == 0 {
subj.created_at = now;
}
subj.updated_at = now;
let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(&subj.candidate_id));
let json = serde_json::to_vec_pretty(&subj).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
let mut subjects = self.subjects.write().await;
subjects.insert(subj.candidate_id.clone(), subj.clone());
tracing::debug!("subject manifest persisted: {}", subj.candidate_id);
Ok(subj)
}
pub async fn get_subject(&self, candidate_id: &str) -> Option<SubjectManifest> {
self.subjects.read().await.get(candidate_id).cloned()
}
pub async fn list_subjects(&self) -> Vec<SubjectManifest> {
self.subjects.read().await.values().cloned().collect()
}
pub async fn delete_subject(&self, candidate_id: &str) -> Result<(), String> {
let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(candidate_id));
ops::delete(&self.store, &key).await?;
self.subjects.write().await.remove(candidate_id);
Ok(())
}
/// Subject count — convenience for the daily retention sweep + audit
/// dashboards that don't need the full list.
pub async fn subjects_count(&self) -> usize {
self.subjects.read().await.len()
}
/// Remove a dataset from the catalog by name. Deletes the manifest
/// from both the in-memory registry and object storage.
///
@ -1027,4 +1112,114 @@ mod tests {
assert!(reg.get(&loser_a).await.is_none());
assert!(reg.get(&loser_b).await.is_none());
}
// --- Subject manifests (Step 1 of subject_manifests_on_catalogd) ---
fn fixture_subject(id: &str) -> shared::types::SubjectManifest {
use shared::types::{
BiometricConsent, BiometricConsentStatus, ConsentStatus,
GeneralPiiConsent, SubjectConsent, SubjectManifest, SubjectRetention,
SubjectStatus, SubjectVertical,
};
SubjectManifest {
schema: "subject_manifest.v1".into(),
candidate_id: id.to_string(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
status: SubjectStatus::Active,
vertical: SubjectVertical::Unknown,
consent: SubjectConsent {
general_pii: GeneralPiiConsent {
status: ConsentStatus::PendingBackfillReview,
version: String::new(),
given_at: None,
withdrawn_at: None,
},
biometric: BiometricConsent {
status: BiometricConsentStatus::NeverCollected,
retention_until: None,
},
},
retention: SubjectRetention {
general_pii_until: chrono::Utc::now() + chrono::Duration::days(365 * 4),
policy: "4_year_default".into(),
},
datasets: vec![],
safe_views: vec![],
audit_log_path: String::new(),
audit_log_chain_root: String::new(),
}
}
#[tokio::test]
async fn put_subject_with_no_dataset_refs_succeeds() {
let reg = fixture();
let s = fixture_subject("CAND-000001");
let stored = reg.put_subject(s).await.unwrap();
assert_eq!(stored.candidate_id, "CAND-000001");
assert_eq!(reg.subjects_count().await, 1);
}
#[tokio::test]
async fn put_subject_rejects_dangling_dataset_ref() {
use shared::types::SubjectDatasetRef;
let reg = fixture();
let mut s = fixture_subject("CAND-000002");
s.datasets.push(SubjectDatasetRef {
name: "no_such_dataset".into(),
key_column: "candidate_id".into(),
key_value: "CAND-000002".into(),
});
let err = reg.put_subject(s).await.unwrap_err();
assert!(err.contains("not in the catalog"), "got: {err}");
}
#[tokio::test]
async fn put_subject_with_valid_dataset_ref_succeeds() {
use shared::types::SubjectDatasetRef;
let reg = fixture();
// Register a dataset first so the subject's reference resolves.
reg.register("workers".into(), fp("w1"), vec![obj("w.parquet")]).await.unwrap();
let mut s = fixture_subject("CAND-000003");
s.datasets.push(SubjectDatasetRef {
name: "workers".into(),
key_column: "candidate_id".into(),
key_value: "CAND-000003".into(),
});
let stored = reg.put_subject(s).await.unwrap();
assert_eq!(stored.datasets.len(), 1);
assert_eq!(stored.datasets[0].name, "workers");
}
#[tokio::test]
async fn subject_round_trips_through_object_store() {
let store = Arc::new(object_store::memory::InMemory::new());
let reg = Registry::new(store.clone());
reg.put_subject(fixture_subject("CAND-RT-001")).await.unwrap();
// Open a fresh registry over the same store and load → confirms
// persistence + load_all path actually round-trips the manifest.
let reg2 = Registry::new(store);
reg2.rebuild().await.unwrap();
let loaded = reg2.get_subject("CAND-RT-001").await;
assert!(loaded.is_some(), "subject not loaded from store");
assert_eq!(loaded.unwrap().status, shared::types::SubjectStatus::Active);
}
#[tokio::test]
async fn delete_subject_removes_in_memory_and_persistence() {
let store = Arc::new(object_store::memory::InMemory::new());
let reg = Registry::new(store.clone());
reg.put_subject(fixture_subject("CAND-DEL-001")).await.unwrap();
assert_eq!(reg.subjects_count().await, 1);
reg.delete_subject("CAND-DEL-001").await.unwrap();
assert_eq!(reg.subjects_count().await, 0);
// Fresh registry over same store — confirm it's also gone from persistence.
let reg2 = Registry::new(store);
reg2.rebuild().await.unwrap();
assert_eq!(reg2.subjects_count().await, 0);
}
}

View File

@ -364,3 +364,149 @@ pub enum Redaction {
keep_suffix: usize,
},
}
// ─── Subject manifests (compliance/access metadata layer) ────────────
// Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md
//
// A SubjectManifest is a per-person record threading existing catalogd
// primitives (datasets, views, tombstones) together by subject token.
// Stored at _catalog/subjects/<candidate_id>.json. NOT a separate
// daemon — extends catalogd's existing manifest pattern.
//
// The audit log per subject (HMAC-chained JSONL at
// _catalog/subjects/<candidate_id>.audit.jsonl) is owned by a separate
// SubjectAuditWriter (Step 2) — not modeled here.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum SubjectStatus {
PendingConsent,
Active,
Withdrawn,
RetentionExpired,
Erased,
}
/// Vertical drives healthcare-PHI routing. Default Unknown is fail-closed:
/// the gateway treats Unknown as healthcare-equivalent until reclassified
/// (per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §2.1).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum SubjectVertical {
Unknown,
General,
Healthcare,
Finance,
Other,
}
impl Default for SubjectVertical {
fn default() -> Self { SubjectVertical::Unknown }
}
/// State machine for general-PII consent. BIPA-specific biometric
/// consent is tracked separately on `BiometricConsent`.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ConsentStatus {
/// Backfilled from existing data sources before consent was tracked.
/// SHOULD route as if no consent exists until reviewed.
PendingBackfillReview,
/// New subject; awaiting consent UX.
PendingFirstContact,
Given,
Withdrawn,
Expired,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum BiometricConsentStatus {
/// Default. No biometric data exists for this subject.
NeverCollected,
Pending,
Given,
Withdrawn,
Expired,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeneralPiiConsent {
pub status: ConsentStatus,
/// Reference to the consent template version (matches a row in the
/// future consent_versions table). Empty when status is pending_*.
#[serde(default)]
pub version: String,
#[serde(default)]
pub given_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(default)]
pub withdrawn_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BiometricConsent {
pub status: BiometricConsentStatus,
/// BIPA: max 3 years from last interaction. Implementation MUST
/// enforce daily expiration sweep against this field.
#[serde(default)]
pub retention_until: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubjectConsent {
pub general_pii: GeneralPiiConsent,
pub biometric: BiometricConsent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubjectRetention {
pub general_pii_until: chrono::DateTime<chrono::Utc>,
/// Free-text policy reference (e.g. "4_year_default", "client_xyz_contract_2026").
#[serde(default)]
pub policy: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubjectDatasetRef {
/// catalogd dataset name (must resolve to an existing manifest).
pub name: String,
/// Column in that dataset that holds the subject's identifier.
pub key_column: String,
/// Specific identifier value (typically same as candidate_id; differs
/// only when a dataset uses a foreign-key with a different format).
pub key_value: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubjectManifest {
/// Schema version tag — always "subject_manifest.v1" for v1.
#[serde(default = "default_subject_manifest_schema")]
pub schema: String,
/// Subject identifier (the canonical token, e.g. "CAND-000001").
pub candidate_id: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub status: SubjectStatus,
#[serde(default)]
pub vertical: SubjectVertical,
pub consent: SubjectConsent,
pub retention: SubjectRetention,
/// Datasets that contain rows for this subject. Used by audit-response
/// to know what to project. Each entry MUST reference an existing
/// catalogd dataset manifest (validated on put_subject).
pub datasets: Vec<SubjectDatasetRef>,
/// Names of existing AiViews that safely project this subject's data.
/// Used by service-tier readers (no PII exposure).
#[serde(default)]
pub safe_views: Vec<String>,
/// Path (relative to repo root) of the per-subject audit JSONL.
/// Owned by SubjectAuditWriter (Step 2). Empty until first write.
#[serde(default)]
pub audit_log_path: String,
/// SHA-256 of the most recent HMAC chain checkpoint of the audit log.
/// Empty until first audit row written.
#[serde(default)]
pub audit_log_chain_root: String,
}
fn default_subject_manifest_schema() -> String { "subject_manifest.v1".into() }