//! Backfill subject manifests from a parquet source. //! //! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 3. //! //! For each row in the source parquet, creates a SubjectManifest with //! the BIPA-defensible defaults from the spec: //! - vertical = unknown (HIPAA fail-closed routing) //! - consent.general_pii = pending_backfill_review (NOT inferred_existing) //! - consent.biometric = never_collected (no biometric backfill) //! - retention.general_pii_until = now + 4 years (default policy) //! //! Idempotent: if a manifest already exists for a candidate_id, the //! backfill skips it. Re-runs are safe. //! //! Conservative defaults: --limit 1000 by default. Pass --all to do //! the full source. Pass --concurrency to bound parallel writes. //! //! Usage examples: //! cargo run --bin backfill_subjects -- \ //! --source data/datasets/workers_500k.parquet \ //! --dataset workers_500k \ //! --key-column worker_id \ //! --candidate-id-prefix WORKER- \ //! --limit 100 \ //! --dry-run //! //! cargo run --bin backfill_subjects -- \ //! --source data/datasets/workers_500k.parquet \ //! --dataset workers_500k \ //! --key-column worker_id \ //! --candidate-id-prefix WORKER- \ //! --safe-view workers_safe \ //! --all //! //! On the live host the binary reads from / writes to ./data so it //! integrates with the running catalogd's persistence path. use catalogd::registry::Registry; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use shared::types::{ BiometricConsent, BiometricConsentStatus, ConsentStatus, GeneralPiiConsent, SubjectConsent, SubjectDatasetRef, SubjectManifest, SubjectRetention, SubjectStatus, SubjectVertical, }; use std::fs::File; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use storaged::backend; const DEFAULT_LIMIT: usize = 1000; const DEFAULT_CONCURRENCY: usize = 32; const DEFAULT_RETENTION_YEARS: i64 = 4; #[derive(Debug)] struct Args { source: PathBuf, dataset_name: String, key_column: String, candidate_id_prefix: String, safe_view: Option, storage_root: PathBuf, limit: Option, concurrency: usize, dry_run: bool, } fn parse_args() -> Result { let argv: Vec = std::env::args().collect(); let mut source: Option = None; let mut dataset_name: Option = None; let mut key_column: Option = None; let mut candidate_id_prefix: String = String::new(); let mut safe_view: Option = None; let mut storage_root: PathBuf = PathBuf::from("./data"); let mut limit: Option = Some(DEFAULT_LIMIT); let mut concurrency: usize = DEFAULT_CONCURRENCY; let mut dry_run = false; let mut i = 1; while i < argv.len() { let arg = &argv[i]; let next = || -> Result { argv.get(i + 1).cloned().ok_or_else(|| format!("{arg} needs a value")) }; match arg.as_str() { "--source" => { source = Some(PathBuf::from(next()?)); i += 2; } "--dataset" => { dataset_name = Some(next()?); i += 2; } "--key-column" => { key_column = Some(next()?); i += 2; } "--candidate-id-prefix" => { candidate_id_prefix = next()?; i += 2; } "--safe-view" => { safe_view = Some(next()?); i += 2; } "--storage-root" => { storage_root = PathBuf::from(next()?); i += 2; } "--limit" => { limit = Some(next()?.parse().map_err(|e| format!("--limit: {e}"))?); i += 2; } "--all" => { limit = None; i += 1; } "--concurrency" => { concurrency = next()?.parse().map_err(|e| format!("--concurrency: {e}"))?; i += 2; } "--dry-run" => { dry_run = true; i += 1; } "-h" | "--help" => { print_help(); std::process::exit(0); } other => return Err(format!("unknown arg: {other}")), } } Ok(Args { source: source.ok_or("missing --source")?, dataset_name: dataset_name.ok_or("missing --dataset")?, key_column: key_column.ok_or("missing --key-column")?, candidate_id_prefix, safe_view, storage_root, limit, concurrency, dry_run, }) } fn print_help() { eprintln!("backfill_subjects — populate subject manifests from a parquet source\n"); eprintln!("Usage: backfill_subjects [flags]\n"); eprintln!("Required:"); eprintln!(" --source PATH parquet file containing subject identifiers"); eprintln!(" --dataset NAME catalogd dataset name (must already exist)"); eprintln!(" --key-column COL column in the parquet that holds subject identifiers\n"); eprintln!("Optional:"); eprintln!(" --candidate-id-prefix STR prefix added to each row's value to form candidate_id"); eprintln!(" (e.g. 'WORKER-' on a worker_id int64 column)"); eprintln!(" --safe-view NAME name of an AiView that safely projects this subject's data"); eprintln!(" --storage-root PATH local-filesystem catalog root (default: ./data)"); eprintln!(" --limit N process at most N rows (default: {DEFAULT_LIMIT})"); eprintln!(" --all process every row in the source"); eprintln!(" --concurrency N max concurrent writes (default: {DEFAULT_CONCURRENCY})"); eprintln!(" --dry-run parse + count + sample, don't write anything"); } #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() { let args = match parse_args() { Ok(a) => a, Err(e) => { eprintln!("error: {e}"); print_help(); std::process::exit(2); } }; if let Err(e) = run(args).await { eprintln!("backfill failed: {e}"); std::process::exit(1); } } async fn run(args: Args) -> Result<(), String> { eprintln!("[backfill] source: {}", args.source.display()); eprintln!("[backfill] dataset: {}", args.dataset_name); eprintln!("[backfill] key: {} (prefix: {:?})", args.key_column, args.candidate_id_prefix); eprintln!("[backfill] limit: {}", args.limit.map(|n| n.to_string()).unwrap_or("all".into())); eprintln!("[backfill] dry-run: {}", args.dry_run); // Stand up catalogd over the same local FS the running daemon uses. let store = backend::init_local(args.storage_root.to_str().ok_or("storage-root not utf-8")?); let reg = Arc::new(Registry::new(store)); reg.rebuild().await?; if reg.get_by_name(&args.dataset_name).await.is_none() { return Err(format!( "dataset '{}' not found in catalog at {}; backfill needs the dataset registered first", args.dataset_name, args.storage_root.display(), )); } eprintln!("[backfill] dataset '{}' exists in catalog ✓", args.dataset_name); // Open parquet reader. let file = File::open(&args.source).map_err(|e| format!("open {}: {e}", args.source.display()))?; let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| e.to_string())?; let total_rows = builder.metadata().file_metadata().num_rows() as usize; eprintln!("[backfill] parquet rows: {total_rows}"); // Project only the key column for IO efficiency. let schema = builder.schema().clone(); let col_idx = schema.index_of(&args.key_column) .map_err(|_| format!("column '{}' not found; parquet has: {:?}", args.key_column, schema.fields().iter().map(|f| f.name()).collect::>(), ))?; let projection = parquet::arrow::ProjectionMask::roots(builder.parquet_schema(), [col_idx]); let mut reader = builder.with_projection(projection).build().map_err(|e| e.to_string())?; // Collect candidate IDs (limited). let cap = args.limit.unwrap_or(total_rows); let mut candidate_ids: Vec = Vec::with_capacity(cap.min(total_rows)); let mut rows_seen = 0usize; 'outer: while let Some(batch_result) = reader.next() { let batch = batch_result.map_err(|e| format!("batch: {e}"))?; let column = batch.column(0); // single projected column for row in 0..batch.num_rows() { if let Some(limit) = args.limit { if candidate_ids.len() >= limit { break 'outer; } } rows_seen += 1; let raw = format_cell_value(column, row)?; candidate_ids.push(format!("{}{}", args.candidate_id_prefix, raw)); } } eprintln!("[backfill] collected {} candidate_ids ({} rows scanned)", candidate_ids.len(), rows_seen); // Sample for the operator. eprintln!("[backfill] sample (first 5):"); for cid in candidate_ids.iter().take(5) { eprintln!(" - {cid}"); } if args.dry_run { eprintln!("[backfill] DRY-RUN — no writes. Re-run without --dry-run to commit."); return Ok(()); } // Write manifests with bounded concurrency. let now = chrono::Utc::now(); let retention_until = now + chrono::Duration::days(365 * DEFAULT_RETENTION_YEARS); let safe_views: Vec = args.safe_view.into_iter().collect(); let dataset_ref = SubjectDatasetRef { name: args.dataset_name.clone(), key_column: args.key_column.clone(), key_value: String::new(), // filled per-subject below }; let attempted = Arc::new(AtomicUsize::new(0)); let inserted = Arc::new(AtomicUsize::new(0)); let skipped = Arc::new(AtomicUsize::new(0)); let failed = Arc::new(AtomicUsize::new(0)); let progress_every = ((candidate_ids.len() / 20).max(1)).min(5_000); // Bounded concurrency via Semaphore. let sem = Arc::new(tokio::sync::Semaphore::new(args.concurrency)); let mut handles = Vec::with_capacity(candidate_ids.len()); for (idx, cid) in candidate_ids.into_iter().enumerate() { let permit = sem.clone().acquire_owned().await.map_err(|e| e.to_string())?; let reg = reg.clone(); let mut ds_ref = dataset_ref.clone(); // strip_prefix gives one-shot semantics; trim_start_matches strips // repeated occurrences (a "WORKER-WORKER-1" id would lose both // prefixes). 2026-05-03 opus scrum WARN at backfill_subjects.rs:189. ds_ref.key_value = cid.strip_prefix(&args.candidate_id_prefix) .unwrap_or(&cid).to_string(); let safe_views = safe_views.clone(); let attempted = attempted.clone(); let inserted = inserted.clone(); let skipped = skipped.clone(); let failed = failed.clone(); let handle = tokio::spawn(async move { let _permit = permit; attempted.fetch_add(1, Ordering::Relaxed); // Idempotency: skip if subject already exists. if reg.get_subject(&cid).await.is_some() { skipped.fetch_add(1, Ordering::Relaxed); return; } let manifest = SubjectManifest { schema: "subject_manifest.v1".into(), candidate_id: cid.clone(), created_at: now, updated_at: now, status: SubjectStatus::Active, vertical: SubjectVertical::Unknown, consent: SubjectConsent { general_pii: GeneralPiiConsent { status: ConsentStatus::PendingBackfillReview, version: String::new(), given_at: None, withdrawn_at: None, }, biometric: BiometricConsent { status: BiometricConsentStatus::NeverCollected, retention_until: None, }, }, retention: SubjectRetention { general_pii_until: retention_until, policy: "4_year_default".into(), }, datasets: vec![ds_ref], safe_views, audit_log_path: String::new(), audit_log_chain_root: String::new(), }; match reg.put_subject(manifest).await { Ok(_) => { inserted.fetch_add(1, Ordering::Relaxed); } Err(e) => { failed.fetch_add(1, Ordering::Relaxed); eprintln!(" ✗ {cid}: {e}"); } } let cur = attempted.load(Ordering::Relaxed); if cur % progress_every == 0 { eprintln!("[backfill] progress: {} attempted ({} inserted, {} skipped, {} failed)", cur, inserted.load(Ordering::Relaxed), skipped.load(Ordering::Relaxed), failed.load(Ordering::Relaxed)); } let _ = idx; }); handles.push(handle); } for h in handles { let _ = h.await; } eprintln!(); eprintln!("──────────────────────────────────────────────────────────────────"); eprintln!("[backfill] done"); eprintln!("[backfill] attempted: {}", attempted.load(Ordering::Relaxed)); eprintln!("[backfill] inserted: {}", inserted.load(Ordering::Relaxed)); eprintln!("[backfill] skipped: {} (already existed — idempotent re-run)", skipped.load(Ordering::Relaxed)); eprintln!("[backfill] failed: {}", failed.load(Ordering::Relaxed)); eprintln!("[backfill] subjects in catalog now: {}", reg.subjects_count().await); Ok(()) } /// Format a single cell value as a string for use in the candidate_id. /// Supports the integer + string types most likely to be subject keys. fn format_cell_value(array: &Arc, row: usize) -> Result { use arrow::array::{Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array}; use arrow::datatypes::DataType; if array.is_null(row) { return Err(format!("row {row}: key column is null — cannot form candidate_id")); } let dt = array.data_type(); let s = match dt { DataType::Int64 => array.as_any().downcast_ref::() .ok_or("int64 downcast")?.value(row).to_string(), DataType::Int32 => array.as_any().downcast_ref::() .ok_or("int32 downcast")?.value(row).to_string(), DataType::UInt64 => array.as_any().downcast_ref::() .ok_or("uint64 downcast")?.value(row).to_string(), DataType::UInt32 => array.as_any().downcast_ref::() .ok_or("uint32 downcast")?.value(row).to_string(), DataType::Utf8 => array.as_any().downcast_ref::() .ok_or("utf8 downcast")?.value(row).to_string(), other => return Err(format!("unsupported key column type: {other:?}")), }; Ok(s) }