diff --git a/crates/catalogd/src/bin/retention_sweep.rs b/crates/catalogd/src/bin/retention_sweep.rs new file mode 100644 index 0000000..e5abd77 --- /dev/null +++ b/crates/catalogd/src/bin/retention_sweep.rs @@ -0,0 +1,375 @@ +//! Daily retention sweep for subject manifests. +//! +//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 7: +//! "Subjects whose retention.general_pii_until < now AND status != erased +//! get marked for review (don't auto-delete; legal needs to approve)." +//! +//! Per shared::types::BiometricConsent doc-comment (BIPA requirement): +//! "max 3 years from last interaction. Implementation MUST enforce +//! daily expiration sweep against this field." +//! +//! Therefore this binary checks BOTH retention clocks and reports any +//! subject whose general-pii window OR biometric window has expired +//! while the subject is not already erased / already-flagged. +//! +//! It does NOT mutate subject manifests. It produces a JSONL review +//! queue at: +//! data/_catalog/subjects/_retention_sweep_.jsonl +//! +//! Operators / legal review the queue and act: +//! - extend retention (legal contract update), +//! - flip status to retention_expired, +//! - schedule erasure. +//! +//! Usage: +//! cargo run --bin retention_sweep # dry-run, stderr only +//! cargo run --bin retention_sweep -- --apply # also write report file +//! cargo run --bin retention_sweep -- --as-of 2030-01-01T00:00:00Z +//! cargo run --bin retention_sweep -- --storage-root /var/lib/lakehouse/data --apply + +use catalogd::registry::Registry; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use shared::types::{BiometricConsentStatus, SubjectManifest, SubjectStatus}; +use std::path::PathBuf; +use std::sync::Arc; +use storaged::backend; + +#[derive(Debug)] +struct Args { + storage_root: PathBuf, + apply: bool, + as_of: Option>, +} + +fn parse_args() -> Result { + let argv: Vec = std::env::args().collect(); + let mut storage_root = PathBuf::from("./data"); + let mut apply = false; + let mut as_of: Option> = None; + + let mut i = 1; + while i < argv.len() { + let arg = &argv[i]; + let next = || -> Result { + argv.get(i + 1).cloned().ok_or_else(|| format!("{arg} needs a value")) + }; + match arg.as_str() { + "--storage-root" => { storage_root = PathBuf::from(next()?); i += 2; } + "--apply" => { apply = true; i += 1; } + "--as-of" => { + let raw = next()?; + as_of = Some(raw.parse().map_err(|e| format!("--as-of (need RFC3339): {e}"))?); + i += 2; + } + "-h" | "--help" => { print_help(); std::process::exit(0); } + other => return Err(format!("unknown arg: {other}")), + } + } + Ok(Args { storage_root, apply, as_of }) +} + +fn print_help() { + eprintln!("retention_sweep — daily retention check on subject manifests"); + eprintln!(); + eprintln!("Per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 7."); + eprintln!("Reports overdue subjects to a JSONL review queue. Does NOT"); + eprintln!("auto-delete. Legal must approve every action."); + eprintln!(); + eprintln!("Flags:"); + eprintln!(" --storage-root PATH local-filesystem catalog root (default: ./data)"); + eprintln!(" --apply write report to data/_catalog/subjects/_retention_sweep_.jsonl"); + eprintln!(" --as-of RFC3339 use this timestamp as 'now' (test/forecast helper)"); +} + +#[derive(Debug, PartialEq, Clone, Copy)] +enum OverdueReason { + GeneralPii, + Biometric, + Both, +} + +impl OverdueReason { + fn as_str(self) -> &'static str { + match self { + OverdueReason::GeneralPii => "general_pii_expired", + OverdueReason::Biometric => "biometric_expired", + OverdueReason::Both => "both_expired", + } + } +} + +#[derive(Debug, Serialize)] +struct OverdueRow<'a> { + candidate_id: &'a str, + status: &'a str, + general_pii_until: DateTime, + days_overdue_general: i64, + biometric_status: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + biometric_retention_until: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + days_overdue_biometric: Option, + reason: &'a str, + swept_at: DateTime, +} + +/// Skipped statuses (idempotent across daily runs): +/// - Erased: subject's data is already gone; nothing to review. +/// - RetentionExpired: subject is already flagged; do not re-fire. +/// +/// Actionable statuses: PendingConsent, Active, Withdrawn — each can +/// still hold PII rows on disk past the retention window. +fn is_overdue(subject: &SubjectManifest, now: DateTime) -> Option { + if matches!(subject.status, SubjectStatus::Erased | SubjectStatus::RetentionExpired) { + return None; + } + let general_overdue = subject.retention.general_pii_until < now; + let biometric_overdue = subject + .consent + .biometric + .retention_until + .map(|t| t < now) + .unwrap_or(false); + match (general_overdue, biometric_overdue) { + (true, true) => Some(OverdueReason::Both), + (true, false) => Some(OverdueReason::GeneralPii), + (false, true) => Some(OverdueReason::Biometric), + (false, false) => None, + } +} + +fn status_str(s: &SubjectStatus) -> &'static str { + match s { + SubjectStatus::PendingConsent => "pending_consent", + SubjectStatus::Active => "active", + SubjectStatus::Withdrawn => "withdrawn", + SubjectStatus::RetentionExpired => "retention_expired", + SubjectStatus::Erased => "erased", + } +} + +fn biometric_status_str(s: &BiometricConsentStatus) -> &'static str { + match s { + BiometricConsentStatus::NeverCollected => "never_collected", + BiometricConsentStatus::Pending => "pending", + BiometricConsentStatus::Given => "given", + BiometricConsentStatus::Withdrawn => "withdrawn", + BiometricConsentStatus::Expired => "expired", + } +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +async fn main() { + let args = match parse_args() { + Ok(a) => a, + Err(e) => { + eprintln!("error: {e}"); + print_help(); + std::process::exit(2); + } + }; + if let Err(e) = run(args).await { + eprintln!("retention_sweep failed: {e}"); + std::process::exit(1); + } +} + +async fn run(args: Args) -> Result<(), String> { + let now = args.as_of.unwrap_or_else(Utc::now); + eprintln!("[retention_sweep] storage-root: {}", args.storage_root.display()); + eprintln!("[retention_sweep] now (sweep clock): {}", now.to_rfc3339()); + eprintln!("[retention_sweep] apply: {}", args.apply); + + let store = backend::init_local(args.storage_root.to_str().ok_or("storage-root not utf-8")?); + let reg = Arc::new(Registry::new(store)); + reg.rebuild().await?; + + let subjects = reg.list_subjects().await; + eprintln!("[retention_sweep] subjects loaded: {}", subjects.len()); + + let mut overdue_rows: Vec = Vec::new(); + let mut by_reason: std::collections::BTreeMap<&'static str, usize> = Default::default(); + + for s in &subjects { + if let Some(reason) = is_overdue(s, now) { + let reason_str: &'static str = reason.as_str(); + *by_reason.entry(reason_str).or_insert(0) += 1; + overdue_rows.push(OverdueRow { + candidate_id: &s.candidate_id, + status: status_str(&s.status), + general_pii_until: s.retention.general_pii_until, + days_overdue_general: (now - s.retention.general_pii_until).num_days(), + biometric_status: biometric_status_str(&s.consent.biometric.status), + biometric_retention_until: s.consent.biometric.retention_until, + days_overdue_biometric: s + .consent + .biometric + .retention_until + .map(|t| (now - t).num_days()), + reason: reason_str, + swept_at: now, + }); + } + } + + eprintln!("[retention_sweep] overdue subjects: {}", overdue_rows.len()); + for (reason, n) in &by_reason { + eprintln!("[retention_sweep] {reason}: {n}"); + } + + if overdue_rows.is_empty() { + eprintln!("[retention_sweep] nothing to do; all subjects within retention window."); + return Ok(()); + } + + eprintln!("[retention_sweep] sample (first 5):"); + for row in overdue_rows.iter().take(5) { + eprintln!( + " - {} status={} reason={} general_overdue_days={}", + row.candidate_id, row.status, row.reason, row.days_overdue_general + ); + } + + if !args.apply { + eprintln!(); + eprintln!("[retention_sweep] DRY-RUN: no file written. Re-run with --apply to persist."); + return Ok(()); + } + + let date_stamp = now.format("%Y-%m-%d"); + let report_path = args + .storage_root + .join("_catalog") + .join("subjects") + .join(format!("_retention_sweep_{date_stamp}.jsonl")); + if let Some(parent) = report_path.parent() { + std::fs::create_dir_all(parent).map_err(|e| format!("mkdir {parent:?}: {e}"))?; + } + let mut buf = String::new(); + for row in &overdue_rows { + let line = serde_json::to_string(row).map_err(|e| format!("serialize row: {e}"))?; + buf.push_str(&line); + buf.push('\n'); + } + std::fs::write(&report_path, &buf).map_err(|e| format!("write {report_path:?}: {e}"))?; + eprintln!("[retention_sweep] report written: {}", report_path.display()); + eprintln!("[retention_sweep] action required by legal/operator. no manifests were mutated."); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + use shared::types::{ + BiometricConsent, ConsentStatus, GeneralPiiConsent, SubjectConsent, SubjectRetention, + SubjectVertical, + }; + + fn fixture( + id: &str, + status: SubjectStatus, + gen_until: DateTime, + bio_until: Option>, + ) -> SubjectManifest { + SubjectManifest { + schema: "subject_manifest.v1".into(), + candidate_id: id.into(), + created_at: Utc::now(), + updated_at: Utc::now(), + status, + vertical: SubjectVertical::General, + consent: SubjectConsent { + general_pii: GeneralPiiConsent { + status: ConsentStatus::Given, + version: "v1".into(), + given_at: None, + withdrawn_at: None, + }, + biometric: BiometricConsent { + status: if bio_until.is_some() { + BiometricConsentStatus::Given + } else { + BiometricConsentStatus::NeverCollected + }, + retention_until: bio_until, + }, + }, + retention: SubjectRetention { + general_pii_until: gen_until, + policy: "test".into(), + }, + datasets: vec![], + safe_views: vec![], + audit_log_path: String::new(), + audit_log_chain_root: String::new(), + } + } + + #[test] + fn active_subject_inside_window_not_overdue() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::Active, now + Duration::days(30), None); + assert!(is_overdue(&s, now).is_none()); + } + + #[test] + fn active_subject_past_general_window_flags_general() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::Active, now - Duration::days(1), None); + assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii)); + } + + #[test] + fn erased_subject_never_re_flags_even_if_far_past() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::Erased, now - Duration::days(1000), None); + assert!(is_overdue(&s, now).is_none(), "erased subjects must not re-fire"); + } + + #[test] + fn already_retention_expired_is_idempotent() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::RetentionExpired, now - Duration::days(1), None); + assert!(is_overdue(&s, now).is_none(), "already-flagged subjects must not re-fire"); + } + + #[test] + fn biometric_overdue_alone_triggers_bipa_path() { + let now = Utc::now(); + let s = fixture( + "a", + SubjectStatus::Active, + now + Duration::days(30), + Some(now - Duration::days(1)), + ); + assert_eq!(is_overdue(&s, now), Some(OverdueReason::Biometric)); + } + + #[test] + fn both_clocks_overdue_reports_both() { + let now = Utc::now(); + let s = fixture( + "a", + SubjectStatus::Active, + now - Duration::days(2), + Some(now - Duration::days(1)), + ); + assert_eq!(is_overdue(&s, now), Some(OverdueReason::Both)); + } + + #[test] + fn pending_consent_subject_can_still_expire() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::PendingConsent, now - Duration::days(1), None); + assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii)); + } + + #[test] + fn withdrawn_subject_still_subject_to_retention() { + let now = Utc::now(); + let s = fixture("a", SubjectStatus::Withdrawn, now - Duration::days(1), None); + assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii)); + } +}