From bce6dfd1eecb2f1c8957d9d8e13499ff1190f75d Mon Sep 17 00:00:00 2001 From: root Date: Sun, 3 May 2026 03:22:54 -0500 Subject: [PATCH] =?UTF-8?q?catalogd:=20Step=203=20=E2=80=94=20backfill=5Fs?= =?UTF-8?q?ubjects=20binary=20(BIPA-defensible=20defaults)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 3. Reads a parquet source, creates one SubjectManifest per row with the spec-defined safe defaults, persists via Registry::put_subject(). Defaults baked in (per spec §2 + §5 Step 5): - vertical = unknown (HIPAA fail-closed) - consent.general_pii = pending_backfill_review (NOT inferred_existing — BIPA defense) - consent.biometric = never_collected (no biometric data backfilled) - retention.general_pii_until = now + 4 years - retention.policy = "4_year_default" Conservative ergonomics: - --limit 1000 by default. --all to do the full source. - --dry-run for parse + count + sample without writes. - --concurrency 32 (bounded via tokio::sync::Semaphore). - Idempotent: skips subjects that already exist in catalog. - Progress reports every ~5% (or 5K rows, whichever smaller). Live verification on workers_500k.parquet: --limit 100 dry-run: parsed 100 rows, sampled WORKER-1..5, 0 writes ✓ --limit 100 commit: 100 inserted, 0 failed, 100 files in data/_catalog/subjects/ ✓ --limit 100 re-run: 0 inserted, 100 skipped (idempotent) ✓ Sample manifest (data/_catalog/subjects/WORKER-1.json): { "schema": "subject_manifest.v1", "candidate_id": "WORKER-1", "status": "active", "vertical": "unknown", "consent": { "general_pii": {"status": "pending_backfill_review", ...}, "biometric": {"status": "never_collected", ...} }, "retention": {"general_pii_until": "2030-05-02T...", "policy": "4_year_default"}, "datasets": [{"name": "workers_500k", "key_column": "worker_id", "key_value": "1"}] } NOT in this commit (future steps): - Step 4: Wire gateway tool registry to write audit rows on every candidate_id returned (uses SubjectAuditWriter from Step 2) - Step 5: Wire validator WorkerLookup similarly - Step 6: /audit/subject/{id} HTTP endpoint - Step 7: Daily retention sweep - Backfill the full 500K (operator decision: --all when ready; note: 500K JSON files in one dir will slow startup load — may want SQLite/single-file backend before that scale) Operator note: backfill is run-once. To extend to candidates table, re-run with --dataset candidates --key-column candidate_id (no prefix since candidate_id is already the canonical token there). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/catalogd/Cargo.toml | 1 + crates/catalogd/src/bin/backfill_subjects.rs | 336 +++++++++++++++++++ 2 files changed, 337 insertions(+) create mode 100644 crates/catalogd/src/bin/backfill_subjects.rs diff --git a/crates/catalogd/Cargo.toml b/crates/catalogd/Cargo.toml index 327c96e..f436557 100644 --- a/crates/catalogd/Cargo.toml +++ b/crates/catalogd/Cargo.toml @@ -16,6 +16,7 @@ chrono = { workspace = true } uuid = { workspace = true } object_store = { workspace = true } arrow = { workspace = true } +parquet = { workspace = true } sha2 = { workspace = true } hmac = { workspace = true } proto = { path = "../proto" } diff --git a/crates/catalogd/src/bin/backfill_subjects.rs b/crates/catalogd/src/bin/backfill_subjects.rs new file mode 100644 index 0000000..02cec18 --- /dev/null +++ b/crates/catalogd/src/bin/backfill_subjects.rs @@ -0,0 +1,336 @@ +//! Backfill subject manifests from a parquet source. +//! +//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 3. +//! +//! For each row in the source parquet, creates a SubjectManifest with +//! the BIPA-defensible defaults from the spec: +//! - vertical = unknown (HIPAA fail-closed routing) +//! - consent.general_pii = pending_backfill_review (NOT inferred_existing) +//! - consent.biometric = never_collected (no biometric backfill) +//! - retention.general_pii_until = now + 4 years (default policy) +//! +//! Idempotent: if a manifest already exists for a candidate_id, the +//! backfill skips it. Re-runs are safe. +//! +//! Conservative defaults: --limit 1000 by default. Pass --all to do +//! the full source. Pass --concurrency to bound parallel writes. +//! +//! Usage examples: +//! cargo run --bin backfill_subjects -- \ +//! --source data/datasets/workers_500k.parquet \ +//! --dataset workers_500k \ +//! --key-column worker_id \ +//! --candidate-id-prefix WORKER- \ +//! --limit 100 \ +//! --dry-run +//! +//! cargo run --bin backfill_subjects -- \ +//! --source data/datasets/workers_500k.parquet \ +//! --dataset workers_500k \ +//! --key-column worker_id \ +//! --candidate-id-prefix WORKER- \ +//! --safe-view workers_safe \ +//! --all +//! +//! On the live host the binary reads from / writes to ./data so it +//! integrates with the running catalogd's persistence path. + +use catalogd::registry::Registry; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use shared::types::{ + BiometricConsent, BiometricConsentStatus, ConsentStatus, GeneralPiiConsent, + SubjectConsent, SubjectDatasetRef, SubjectManifest, SubjectRetention, + SubjectStatus, SubjectVertical, +}; +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use storaged::backend; + +const DEFAULT_LIMIT: usize = 1000; +const DEFAULT_CONCURRENCY: usize = 32; +const DEFAULT_RETENTION_YEARS: i64 = 4; + +#[derive(Debug)] +struct Args { + source: PathBuf, + dataset_name: String, + key_column: String, + candidate_id_prefix: String, + safe_view: Option, + storage_root: PathBuf, + limit: Option, + concurrency: usize, + dry_run: bool, +} + +fn parse_args() -> Result { + let argv: Vec = std::env::args().collect(); + let mut source: Option = None; + let mut dataset_name: Option = None; + let mut key_column: Option = None; + let mut candidate_id_prefix: String = String::new(); + let mut safe_view: Option = None; + let mut storage_root: PathBuf = PathBuf::from("./data"); + let mut limit: Option = Some(DEFAULT_LIMIT); + let mut concurrency: usize = DEFAULT_CONCURRENCY; + let mut dry_run = false; + + let mut i = 1; + while i < argv.len() { + let arg = &argv[i]; + let next = || -> Result { + argv.get(i + 1).cloned().ok_or_else(|| format!("{arg} needs a value")) + }; + match arg.as_str() { + "--source" => { source = Some(PathBuf::from(next()?)); i += 2; } + "--dataset" => { dataset_name = Some(next()?); i += 2; } + "--key-column" => { key_column = Some(next()?); i += 2; } + "--candidate-id-prefix" => { candidate_id_prefix = next()?; i += 2; } + "--safe-view" => { safe_view = Some(next()?); i += 2; } + "--storage-root" => { storage_root = PathBuf::from(next()?); i += 2; } + "--limit" => { limit = Some(next()?.parse().map_err(|e| format!("--limit: {e}"))?); i += 2; } + "--all" => { limit = None; i += 1; } + "--concurrency" => { concurrency = next()?.parse().map_err(|e| format!("--concurrency: {e}"))?; i += 2; } + "--dry-run" => { dry_run = true; i += 1; } + "-h" | "--help" => { print_help(); std::process::exit(0); } + other => return Err(format!("unknown arg: {other}")), + } + } + Ok(Args { + source: source.ok_or("missing --source")?, + dataset_name: dataset_name.ok_or("missing --dataset")?, + key_column: key_column.ok_or("missing --key-column")?, + candidate_id_prefix, + safe_view, + storage_root, + limit, + concurrency, + dry_run, + }) +} + +fn print_help() { + eprintln!("backfill_subjects — populate subject manifests from a parquet source\n"); + eprintln!("Usage: backfill_subjects [flags]\n"); + eprintln!("Required:"); + eprintln!(" --source PATH parquet file containing subject identifiers"); + eprintln!(" --dataset NAME catalogd dataset name (must already exist)"); + eprintln!(" --key-column COL column in the parquet that holds subject identifiers\n"); + eprintln!("Optional:"); + eprintln!(" --candidate-id-prefix STR prefix added to each row's value to form candidate_id"); + eprintln!(" (e.g. 'WORKER-' on a worker_id int64 column)"); + eprintln!(" --safe-view NAME name of an AiView that safely projects this subject's data"); + eprintln!(" --storage-root PATH local-filesystem catalog root (default: ./data)"); + eprintln!(" --limit N process at most N rows (default: {DEFAULT_LIMIT})"); + eprintln!(" --all process every row in the source"); + eprintln!(" --concurrency N max concurrent writes (default: {DEFAULT_CONCURRENCY})"); + eprintln!(" --dry-run parse + count + sample, don't write anything"); +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +async fn main() { + let args = match parse_args() { + Ok(a) => a, + Err(e) => { + eprintln!("error: {e}"); + print_help(); + std::process::exit(2); + } + }; + + if let Err(e) = run(args).await { + eprintln!("backfill failed: {e}"); + std::process::exit(1); + } +} + +async fn run(args: Args) -> Result<(), String> { + eprintln!("[backfill] source: {}", args.source.display()); + eprintln!("[backfill] dataset: {}", args.dataset_name); + eprintln!("[backfill] key: {} (prefix: {:?})", args.key_column, args.candidate_id_prefix); + eprintln!("[backfill] limit: {}", args.limit.map(|n| n.to_string()).unwrap_or("all".into())); + eprintln!("[backfill] dry-run: {}", args.dry_run); + + // Stand up catalogd over the same local FS the running daemon uses. + let store = backend::init_local(args.storage_root.to_str().ok_or("storage-root not utf-8")?); + let reg = Arc::new(Registry::new(store)); + reg.rebuild().await?; + + if reg.get_by_name(&args.dataset_name).await.is_none() { + return Err(format!( + "dataset '{}' not found in catalog at {}; backfill needs the dataset registered first", + args.dataset_name, args.storage_root.display(), + )); + } + eprintln!("[backfill] dataset '{}' exists in catalog ✓", args.dataset_name); + + // Open parquet reader. + let file = File::open(&args.source).map_err(|e| format!("open {}: {e}", args.source.display()))?; + let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| e.to_string())?; + let total_rows = builder.metadata().file_metadata().num_rows() as usize; + eprintln!("[backfill] parquet rows: {total_rows}"); + + // Project only the key column for IO efficiency. + let schema = builder.schema().clone(); + let col_idx = schema.index_of(&args.key_column) + .map_err(|_| format!("column '{}' not found; parquet has: {:?}", + args.key_column, + schema.fields().iter().map(|f| f.name()).collect::>(), + ))?; + let projection = parquet::arrow::ProjectionMask::roots(builder.parquet_schema(), [col_idx]); + let mut reader = builder.with_projection(projection).build().map_err(|e| e.to_string())?; + + // Collect candidate IDs (limited). + let cap = args.limit.unwrap_or(total_rows); + let mut candidate_ids: Vec = Vec::with_capacity(cap.min(total_rows)); + let mut rows_seen = 0usize; + 'outer: while let Some(batch_result) = reader.next() { + let batch = batch_result.map_err(|e| format!("batch: {e}"))?; + let column = batch.column(0); // single projected column + for row in 0..batch.num_rows() { + if let Some(limit) = args.limit { + if candidate_ids.len() >= limit { break 'outer; } + } + rows_seen += 1; + let raw = format_cell_value(column, row)?; + candidate_ids.push(format!("{}{}", args.candidate_id_prefix, raw)); + } + } + eprintln!("[backfill] collected {} candidate_ids ({} rows scanned)", candidate_ids.len(), rows_seen); + + // Sample for the operator. + eprintln!("[backfill] sample (first 5):"); + for cid in candidate_ids.iter().take(5) { + eprintln!(" - {cid}"); + } + + if args.dry_run { + eprintln!("[backfill] DRY-RUN — no writes. Re-run without --dry-run to commit."); + return Ok(()); + } + + // Write manifests with bounded concurrency. + let now = chrono::Utc::now(); + let retention_until = now + chrono::Duration::days(365 * DEFAULT_RETENTION_YEARS); + let safe_views: Vec = args.safe_view.into_iter().collect(); + let dataset_ref = SubjectDatasetRef { + name: args.dataset_name.clone(), + key_column: args.key_column.clone(), + key_value: String::new(), // filled per-subject below + }; + + let attempted = Arc::new(AtomicUsize::new(0)); + let inserted = Arc::new(AtomicUsize::new(0)); + let skipped = Arc::new(AtomicUsize::new(0)); + let failed = Arc::new(AtomicUsize::new(0)); + let progress_every = ((candidate_ids.len() / 20).max(1)).min(5_000); + + // Bounded concurrency via Semaphore. + let sem = Arc::new(tokio::sync::Semaphore::new(args.concurrency)); + let mut handles = Vec::with_capacity(candidate_ids.len()); + for (idx, cid) in candidate_ids.into_iter().enumerate() { + let permit = sem.clone().acquire_owned().await.map_err(|e| e.to_string())?; + let reg = reg.clone(); + let mut ds_ref = dataset_ref.clone(); + ds_ref.key_value = cid.trim_start_matches(&args.candidate_id_prefix).to_string(); + let safe_views = safe_views.clone(); + let attempted = attempted.clone(); + let inserted = inserted.clone(); + let skipped = skipped.clone(); + let failed = failed.clone(); + + let handle = tokio::spawn(async move { + let _permit = permit; + attempted.fetch_add(1, Ordering::Relaxed); + // Idempotency: skip if subject already exists. + if reg.get_subject(&cid).await.is_some() { + skipped.fetch_add(1, Ordering::Relaxed); + return; + } + let manifest = SubjectManifest { + schema: "subject_manifest.v1".into(), + candidate_id: cid.clone(), + created_at: now, + updated_at: now, + status: SubjectStatus::Active, + vertical: SubjectVertical::Unknown, + consent: SubjectConsent { + general_pii: GeneralPiiConsent { + status: ConsentStatus::PendingBackfillReview, + version: String::new(), + given_at: None, + withdrawn_at: None, + }, + biometric: BiometricConsent { + status: BiometricConsentStatus::NeverCollected, + retention_until: None, + }, + }, + retention: SubjectRetention { + general_pii_until: retention_until, + policy: "4_year_default".into(), + }, + datasets: vec![ds_ref], + safe_views, + audit_log_path: String::new(), + audit_log_chain_root: String::new(), + }; + match reg.put_subject(manifest).await { + Ok(_) => { inserted.fetch_add(1, Ordering::Relaxed); } + Err(e) => { + failed.fetch_add(1, Ordering::Relaxed); + eprintln!(" ✗ {cid}: {e}"); + } + } + let cur = attempted.load(Ordering::Relaxed); + if cur % progress_every == 0 { + eprintln!("[backfill] progress: {} attempted ({} inserted, {} skipped, {} failed)", + cur, inserted.load(Ordering::Relaxed), + skipped.load(Ordering::Relaxed), + failed.load(Ordering::Relaxed)); + } + let _ = idx; + }); + handles.push(handle); + } + for h in handles { + let _ = h.await; + } + + eprintln!(); + eprintln!("──────────────────────────────────────────────────────────────────"); + eprintln!("[backfill] done"); + eprintln!("[backfill] attempted: {}", attempted.load(Ordering::Relaxed)); + eprintln!("[backfill] inserted: {}", inserted.load(Ordering::Relaxed)); + eprintln!("[backfill] skipped: {} (already existed — idempotent re-run)", skipped.load(Ordering::Relaxed)); + eprintln!("[backfill] failed: {}", failed.load(Ordering::Relaxed)); + eprintln!("[backfill] subjects in catalog now: {}", reg.subjects_count().await); + Ok(()) +} + +/// Format a single cell value as a string for use in the candidate_id. +/// Supports the integer + string types most likely to be subject keys. +fn format_cell_value(array: &Arc, row: usize) -> Result { + use arrow::array::{Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array}; + use arrow::datatypes::DataType; + if array.is_null(row) { + return Err(format!("row {row}: key column is null — cannot form candidate_id")); + } + let dt = array.data_type(); + let s = match dt { + DataType::Int64 => array.as_any().downcast_ref::() + .ok_or("int64 downcast")?.value(row).to_string(), + DataType::Int32 => array.as_any().downcast_ref::() + .ok_or("int32 downcast")?.value(row).to_string(), + DataType::UInt64 => array.as_any().downcast_ref::() + .ok_or("uint64 downcast")?.value(row).to_string(), + DataType::UInt32 => array.as_any().downcast_ref::() + .ok_or("uint32 downcast")?.value(row).to_string(), + DataType::Utf8 => array.as_any().downcast_ref::() + .ok_or("utf8 downcast")?.value(row).to_string(), + other => return Err(format!("unsupported key column type: {other:?}")), + }; + Ok(s) +}