catalogd: Step 7 — daily retention sweep binary
Per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 7:
"Subjects whose retention.general_pii_until < now AND status != erased
get marked for review (don't auto-delete; legal needs to approve)."
Per shared::types::BiometricConsent doc-comment (BIPA requirement on
biometric data, max 3 years from last interaction):
"Implementation MUST enforce daily expiration sweep against this field."
Therefore the sweep checks BOTH retention clocks. Reports overdue
subjects to data/_catalog/subjects/_retention_sweep_<YYYY-MM-DD>.jsonl.
Idempotent: subjects already in {Erased, RetentionExpired} are skipped
so daily runs do not append duplicate rows.
Does NOT mutate subject manifests. Legal/operator owns the action
(extend, flip status, schedule erasure).
CLI:
retention_sweep # dry-run (default), stderr only
retention_sweep --apply # also write JSONL report
retention_sweep --as-of <RFC3339> # alternate clock for forecast/test
retention_sweep --storage-root <dir> # default ./data
Tests: 8 unit tests on is_overdue covering all 5 SubjectStatus values,
both clocks, BIPA-only path, and idempotency on already-flagged
subjects.
Live verification (100 subjects in ./data/_catalog/subjects):
- now (2026-05-03): 0 overdue (correct — 4-year retention)
- --as-of 2031-06-01: 100 overdue, 394 days past, jsonl report shape
verified with biometric fields correctly omitted via
serde skip_serializing_if when subject has no biometric clock.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2a4b316a15
commit
8fc6238dea
375
crates/catalogd/src/bin/retention_sweep.rs
Normal file
375
crates/catalogd/src/bin/retention_sweep.rs
Normal file
@ -0,0 +1,375 @@
|
|||||||
|
//! Daily retention sweep for subject manifests.
|
||||||
|
//!
|
||||||
|
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 7:
|
||||||
|
//! "Subjects whose retention.general_pii_until < now AND status != erased
|
||||||
|
//! get marked for review (don't auto-delete; legal needs to approve)."
|
||||||
|
//!
|
||||||
|
//! Per shared::types::BiometricConsent doc-comment (BIPA requirement):
|
||||||
|
//! "max 3 years from last interaction. Implementation MUST enforce
|
||||||
|
//! daily expiration sweep against this field."
|
||||||
|
//!
|
||||||
|
//! Therefore this binary checks BOTH retention clocks and reports any
|
||||||
|
//! subject whose general-pii window OR biometric window has expired
|
||||||
|
//! while the subject is not already erased / already-flagged.
|
||||||
|
//!
|
||||||
|
//! It does NOT mutate subject manifests. It produces a JSONL review
|
||||||
|
//! queue at:
|
||||||
|
//! data/_catalog/subjects/_retention_sweep_<YYYY-MM-DD>.jsonl
|
||||||
|
//!
|
||||||
|
//! Operators / legal review the queue and act:
|
||||||
|
//! - extend retention (legal contract update),
|
||||||
|
//! - flip status to retention_expired,
|
||||||
|
//! - schedule erasure.
|
||||||
|
//!
|
||||||
|
//! Usage:
|
||||||
|
//! cargo run --bin retention_sweep # dry-run, stderr only
|
||||||
|
//! cargo run --bin retention_sweep -- --apply # also write report file
|
||||||
|
//! cargo run --bin retention_sweep -- --as-of 2030-01-01T00:00:00Z
|
||||||
|
//! cargo run --bin retention_sweep -- --storage-root /var/lib/lakehouse/data --apply
|
||||||
|
|
||||||
|
use catalogd::registry::Registry;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::Serialize;
|
||||||
|
use shared::types::{BiometricConsentStatus, SubjectManifest, SubjectStatus};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use storaged::backend;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Args {
|
||||||
|
storage_root: PathBuf,
|
||||||
|
apply: bool,
|
||||||
|
as_of: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_args() -> Result<Args, String> {
|
||||||
|
let argv: Vec<String> = std::env::args().collect();
|
||||||
|
let mut storage_root = PathBuf::from("./data");
|
||||||
|
let mut apply = false;
|
||||||
|
let mut as_of: Option<DateTime<Utc>> = None;
|
||||||
|
|
||||||
|
let mut i = 1;
|
||||||
|
while i < argv.len() {
|
||||||
|
let arg = &argv[i];
|
||||||
|
let next = || -> Result<String, String> {
|
||||||
|
argv.get(i + 1).cloned().ok_or_else(|| format!("{arg} needs a value"))
|
||||||
|
};
|
||||||
|
match arg.as_str() {
|
||||||
|
"--storage-root" => { storage_root = PathBuf::from(next()?); i += 2; }
|
||||||
|
"--apply" => { apply = true; i += 1; }
|
||||||
|
"--as-of" => {
|
||||||
|
let raw = next()?;
|
||||||
|
as_of = Some(raw.parse().map_err(|e| format!("--as-of (need RFC3339): {e}"))?);
|
||||||
|
i += 2;
|
||||||
|
}
|
||||||
|
"-h" | "--help" => { print_help(); std::process::exit(0); }
|
||||||
|
other => return Err(format!("unknown arg: {other}")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Args { storage_root, apply, as_of })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_help() {
|
||||||
|
eprintln!("retention_sweep — daily retention check on subject manifests");
|
||||||
|
eprintln!();
|
||||||
|
eprintln!("Per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 7.");
|
||||||
|
eprintln!("Reports overdue subjects to a JSONL review queue. Does NOT");
|
||||||
|
eprintln!("auto-delete. Legal must approve every action.");
|
||||||
|
eprintln!();
|
||||||
|
eprintln!("Flags:");
|
||||||
|
eprintln!(" --storage-root PATH local-filesystem catalog root (default: ./data)");
|
||||||
|
eprintln!(" --apply write report to data/_catalog/subjects/_retention_sweep_<date>.jsonl");
|
||||||
|
eprintln!(" --as-of RFC3339 use this timestamp as 'now' (test/forecast helper)");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||||
|
enum OverdueReason {
|
||||||
|
GeneralPii,
|
||||||
|
Biometric,
|
||||||
|
Both,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OverdueReason {
|
||||||
|
fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
OverdueReason::GeneralPii => "general_pii_expired",
|
||||||
|
OverdueReason::Biometric => "biometric_expired",
|
||||||
|
OverdueReason::Both => "both_expired",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct OverdueRow<'a> {
|
||||||
|
candidate_id: &'a str,
|
||||||
|
status: &'a str,
|
||||||
|
general_pii_until: DateTime<Utc>,
|
||||||
|
days_overdue_general: i64,
|
||||||
|
biometric_status: &'a str,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
biometric_retention_until: Option<DateTime<Utc>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
days_overdue_biometric: Option<i64>,
|
||||||
|
reason: &'a str,
|
||||||
|
swept_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Skipped statuses (idempotent across daily runs):
|
||||||
|
/// - Erased: subject's data is already gone; nothing to review.
|
||||||
|
/// - RetentionExpired: subject is already flagged; do not re-fire.
|
||||||
|
///
|
||||||
|
/// Actionable statuses: PendingConsent, Active, Withdrawn — each can
|
||||||
|
/// still hold PII rows on disk past the retention window.
|
||||||
|
fn is_overdue(subject: &SubjectManifest, now: DateTime<Utc>) -> Option<OverdueReason> {
|
||||||
|
if matches!(subject.status, SubjectStatus::Erased | SubjectStatus::RetentionExpired) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let general_overdue = subject.retention.general_pii_until < now;
|
||||||
|
let biometric_overdue = subject
|
||||||
|
.consent
|
||||||
|
.biometric
|
||||||
|
.retention_until
|
||||||
|
.map(|t| t < now)
|
||||||
|
.unwrap_or(false);
|
||||||
|
match (general_overdue, biometric_overdue) {
|
||||||
|
(true, true) => Some(OverdueReason::Both),
|
||||||
|
(true, false) => Some(OverdueReason::GeneralPii),
|
||||||
|
(false, true) => Some(OverdueReason::Biometric),
|
||||||
|
(false, false) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_str(s: &SubjectStatus) -> &'static str {
|
||||||
|
match s {
|
||||||
|
SubjectStatus::PendingConsent => "pending_consent",
|
||||||
|
SubjectStatus::Active => "active",
|
||||||
|
SubjectStatus::Withdrawn => "withdrawn",
|
||||||
|
SubjectStatus::RetentionExpired => "retention_expired",
|
||||||
|
SubjectStatus::Erased => "erased",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn biometric_status_str(s: &BiometricConsentStatus) -> &'static str {
|
||||||
|
match s {
|
||||||
|
BiometricConsentStatus::NeverCollected => "never_collected",
|
||||||
|
BiometricConsentStatus::Pending => "pending",
|
||||||
|
BiometricConsentStatus::Given => "given",
|
||||||
|
BiometricConsentStatus::Withdrawn => "withdrawn",
|
||||||
|
BiometricConsentStatus::Expired => "expired",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn main() {
|
||||||
|
let args = match parse_args() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("error: {e}");
|
||||||
|
print_help();
|
||||||
|
std::process::exit(2);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = run(args).await {
|
||||||
|
eprintln!("retention_sweep failed: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(args: Args) -> Result<(), String> {
|
||||||
|
let now = args.as_of.unwrap_or_else(Utc::now);
|
||||||
|
eprintln!("[retention_sweep] storage-root: {}", args.storage_root.display());
|
||||||
|
eprintln!("[retention_sweep] now (sweep clock): {}", now.to_rfc3339());
|
||||||
|
eprintln!("[retention_sweep] apply: {}", args.apply);
|
||||||
|
|
||||||
|
let store = backend::init_local(args.storage_root.to_str().ok_or("storage-root not utf-8")?);
|
||||||
|
let reg = Arc::new(Registry::new(store));
|
||||||
|
reg.rebuild().await?;
|
||||||
|
|
||||||
|
let subjects = reg.list_subjects().await;
|
||||||
|
eprintln!("[retention_sweep] subjects loaded: {}", subjects.len());
|
||||||
|
|
||||||
|
let mut overdue_rows: Vec<OverdueRow> = Vec::new();
|
||||||
|
let mut by_reason: std::collections::BTreeMap<&'static str, usize> = Default::default();
|
||||||
|
|
||||||
|
for s in &subjects {
|
||||||
|
if let Some(reason) = is_overdue(s, now) {
|
||||||
|
let reason_str: &'static str = reason.as_str();
|
||||||
|
*by_reason.entry(reason_str).or_insert(0) += 1;
|
||||||
|
overdue_rows.push(OverdueRow {
|
||||||
|
candidate_id: &s.candidate_id,
|
||||||
|
status: status_str(&s.status),
|
||||||
|
general_pii_until: s.retention.general_pii_until,
|
||||||
|
days_overdue_general: (now - s.retention.general_pii_until).num_days(),
|
||||||
|
biometric_status: biometric_status_str(&s.consent.biometric.status),
|
||||||
|
biometric_retention_until: s.consent.biometric.retention_until,
|
||||||
|
days_overdue_biometric: s
|
||||||
|
.consent
|
||||||
|
.biometric
|
||||||
|
.retention_until
|
||||||
|
.map(|t| (now - t).num_days()),
|
||||||
|
reason: reason_str,
|
||||||
|
swept_at: now,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!("[retention_sweep] overdue subjects: {}", overdue_rows.len());
|
||||||
|
for (reason, n) in &by_reason {
|
||||||
|
eprintln!("[retention_sweep] {reason}: {n}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if overdue_rows.is_empty() {
|
||||||
|
eprintln!("[retention_sweep] nothing to do; all subjects within retention window.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!("[retention_sweep] sample (first 5):");
|
||||||
|
for row in overdue_rows.iter().take(5) {
|
||||||
|
eprintln!(
|
||||||
|
" - {} status={} reason={} general_overdue_days={}",
|
||||||
|
row.candidate_id, row.status, row.reason, row.days_overdue_general
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !args.apply {
|
||||||
|
eprintln!();
|
||||||
|
eprintln!("[retention_sweep] DRY-RUN: no file written. Re-run with --apply to persist.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let date_stamp = now.format("%Y-%m-%d");
|
||||||
|
let report_path = args
|
||||||
|
.storage_root
|
||||||
|
.join("_catalog")
|
||||||
|
.join("subjects")
|
||||||
|
.join(format!("_retention_sweep_{date_stamp}.jsonl"));
|
||||||
|
if let Some(parent) = report_path.parent() {
|
||||||
|
std::fs::create_dir_all(parent).map_err(|e| format!("mkdir {parent:?}: {e}"))?;
|
||||||
|
}
|
||||||
|
let mut buf = String::new();
|
||||||
|
for row in &overdue_rows {
|
||||||
|
let line = serde_json::to_string(row).map_err(|e| format!("serialize row: {e}"))?;
|
||||||
|
buf.push_str(&line);
|
||||||
|
buf.push('\n');
|
||||||
|
}
|
||||||
|
std::fs::write(&report_path, &buf).map_err(|e| format!("write {report_path:?}: {e}"))?;
|
||||||
|
eprintln!("[retention_sweep] report written: {}", report_path.display());
|
||||||
|
eprintln!("[retention_sweep] action required by legal/operator. no manifests were mutated.");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Duration;
|
||||||
|
use shared::types::{
|
||||||
|
BiometricConsent, ConsentStatus, GeneralPiiConsent, SubjectConsent, SubjectRetention,
|
||||||
|
SubjectVertical,
|
||||||
|
};
|
||||||
|
|
||||||
|
fn fixture(
|
||||||
|
id: &str,
|
||||||
|
status: SubjectStatus,
|
||||||
|
gen_until: DateTime<Utc>,
|
||||||
|
bio_until: Option<DateTime<Utc>>,
|
||||||
|
) -> SubjectManifest {
|
||||||
|
SubjectManifest {
|
||||||
|
schema: "subject_manifest.v1".into(),
|
||||||
|
candidate_id: id.into(),
|
||||||
|
created_at: Utc::now(),
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
status,
|
||||||
|
vertical: SubjectVertical::General,
|
||||||
|
consent: SubjectConsent {
|
||||||
|
general_pii: GeneralPiiConsent {
|
||||||
|
status: ConsentStatus::Given,
|
||||||
|
version: "v1".into(),
|
||||||
|
given_at: None,
|
||||||
|
withdrawn_at: None,
|
||||||
|
},
|
||||||
|
biometric: BiometricConsent {
|
||||||
|
status: if bio_until.is_some() {
|
||||||
|
BiometricConsentStatus::Given
|
||||||
|
} else {
|
||||||
|
BiometricConsentStatus::NeverCollected
|
||||||
|
},
|
||||||
|
retention_until: bio_until,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
retention: SubjectRetention {
|
||||||
|
general_pii_until: gen_until,
|
||||||
|
policy: "test".into(),
|
||||||
|
},
|
||||||
|
datasets: vec![],
|
||||||
|
safe_views: vec![],
|
||||||
|
audit_log_path: String::new(),
|
||||||
|
audit_log_chain_root: String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn active_subject_inside_window_not_overdue() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::Active, now + Duration::days(30), None);
|
||||||
|
assert!(is_overdue(&s, now).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn active_subject_past_general_window_flags_general() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::Active, now - Duration::days(1), None);
|
||||||
|
assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn erased_subject_never_re_flags_even_if_far_past() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::Erased, now - Duration::days(1000), None);
|
||||||
|
assert!(is_overdue(&s, now).is_none(), "erased subjects must not re-fire");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn already_retention_expired_is_idempotent() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::RetentionExpired, now - Duration::days(1), None);
|
||||||
|
assert!(is_overdue(&s, now).is_none(), "already-flagged subjects must not re-fire");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn biometric_overdue_alone_triggers_bipa_path() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture(
|
||||||
|
"a",
|
||||||
|
SubjectStatus::Active,
|
||||||
|
now + Duration::days(30),
|
||||||
|
Some(now - Duration::days(1)),
|
||||||
|
);
|
||||||
|
assert_eq!(is_overdue(&s, now), Some(OverdueReason::Biometric));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn both_clocks_overdue_reports_both() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture(
|
||||||
|
"a",
|
||||||
|
SubjectStatus::Active,
|
||||||
|
now - Duration::days(2),
|
||||||
|
Some(now - Duration::days(1)),
|
||||||
|
);
|
||||||
|
assert_eq!(is_overdue(&s, now), Some(OverdueReason::Both));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pending_consent_subject_can_still_expire() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::PendingConsent, now - Duration::days(1), None);
|
||||||
|
assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn withdrawn_subject_still_subject_to_retention() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let s = fixture("a", SubjectStatus::Withdrawn, now - Duration::days(1), None);
|
||||||
|
assert_eq!(is_overdue(&s, now), Some(OverdueReason::GeneralPii));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user