catalogd: Step 3 — backfill_subjects binary (BIPA-defensible defaults)

Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 3.
Reads a parquet source, creates one SubjectManifest per row with the
spec-defined safe defaults, persists via Registry::put_subject().

Defaults baked in (per spec §2 + §5 Step 5):
  - vertical = unknown                     (HIPAA fail-closed)
  - consent.general_pii = pending_backfill_review  (NOT inferred_existing — BIPA defense)
  - consent.biometric  = never_collected   (no biometric data backfilled)
  - retention.general_pii_until = now + 4 years
  - retention.policy = "4_year_default"

Conservative ergonomics:
  - --limit 1000 by default. --all to do the full source.
  - --dry-run for parse + count + sample without writes.
  - --concurrency 32 (bounded via tokio::sync::Semaphore).
  - Idempotent: skips subjects that already exist in catalog.
  - Progress reports every ~5% (or 5K rows, whichever smaller).

Live verification on workers_500k.parquet:
  --limit 100 dry-run:  parsed 100 rows, sampled WORKER-1..5, 0 writes ✓
  --limit 100 commit:   100 inserted, 0 failed, 100 files in
                        data/_catalog/subjects/ ✓
  --limit 100 re-run:   0 inserted, 100 skipped (idempotent) ✓

Sample manifest (data/_catalog/subjects/WORKER-1.json):
  {
    "schema": "subject_manifest.v1",
    "candidate_id": "WORKER-1",
    "status": "active",
    "vertical": "unknown",
    "consent": {
      "general_pii": {"status": "pending_backfill_review", ...},
      "biometric":   {"status": "never_collected",         ...}
    },
    "retention": {"general_pii_until": "2030-05-02T...", "policy": "4_year_default"},
    "datasets": [{"name": "workers_500k", "key_column": "worker_id", "key_value": "1"}]
  }

NOT in this commit (future steps):
  - Step 4: Wire gateway tool registry to write audit rows on every
    candidate_id returned (uses SubjectAuditWriter from Step 2)
  - Step 5: Wire validator WorkerLookup similarly
  - Step 6: /audit/subject/{id} HTTP endpoint
  - Step 7: Daily retention sweep
  - Backfill the full 500K (operator decision: --all when ready;
    note: 500K JSON files in one dir will slow startup load — may
    want SQLite/single-file backend before that scale)

Operator note: backfill is run-once. To extend to candidates table,
re-run with --dataset candidates --key-column candidate_id (no prefix
since candidate_id is already the canonical token there).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 03:22:54 -05:00
parent d16131bcab
commit bce6dfd1ee
2 changed files with 337 additions and 0 deletions

View File

@ -16,6 +16,7 @@ chrono = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
object_store = { workspace = true } object_store = { workspace = true }
arrow = { workspace = true } arrow = { workspace = true }
parquet = { workspace = true }
sha2 = { workspace = true } sha2 = { workspace = true }
hmac = { workspace = true } hmac = { workspace = true }
proto = { path = "../proto" } proto = { path = "../proto" }

View File

@ -0,0 +1,336 @@
//! Backfill subject manifests from a parquet source.
//!
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 3.
//!
//! For each row in the source parquet, creates a SubjectManifest with
//! the BIPA-defensible defaults from the spec:
//! - vertical = unknown (HIPAA fail-closed routing)
//! - consent.general_pii = pending_backfill_review (NOT inferred_existing)
//! - consent.biometric = never_collected (no biometric backfill)
//! - retention.general_pii_until = now + 4 years (default policy)
//!
//! Idempotent: if a manifest already exists for a candidate_id, the
//! backfill skips it. Re-runs are safe.
//!
//! Conservative defaults: --limit 1000 by default. Pass --all to do
//! the full source. Pass --concurrency to bound parallel writes.
//!
//! Usage examples:
//! cargo run --bin backfill_subjects -- \
//! --source data/datasets/workers_500k.parquet \
//! --dataset workers_500k \
//! --key-column worker_id \
//! --candidate-id-prefix WORKER- \
//! --limit 100 \
//! --dry-run
//!
//! cargo run --bin backfill_subjects -- \
//! --source data/datasets/workers_500k.parquet \
//! --dataset workers_500k \
//! --key-column worker_id \
//! --candidate-id-prefix WORKER- \
//! --safe-view workers_safe \
//! --all
//!
//! On the live host the binary reads from / writes to ./data so it
//! integrates with the running catalogd's persistence path.
use catalogd::registry::Registry;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use shared::types::{
BiometricConsent, BiometricConsentStatus, ConsentStatus, GeneralPiiConsent,
SubjectConsent, SubjectDatasetRef, SubjectManifest, SubjectRetention,
SubjectStatus, SubjectVertical,
};
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use storaged::backend;
const DEFAULT_LIMIT: usize = 1000;
const DEFAULT_CONCURRENCY: usize = 32;
const DEFAULT_RETENTION_YEARS: i64 = 4;
#[derive(Debug)]
struct Args {
source: PathBuf,
dataset_name: String,
key_column: String,
candidate_id_prefix: String,
safe_view: Option<String>,
storage_root: PathBuf,
limit: Option<usize>,
concurrency: usize,
dry_run: bool,
}
fn parse_args() -> Result<Args, String> {
let argv: Vec<String> = std::env::args().collect();
let mut source: Option<PathBuf> = None;
let mut dataset_name: Option<String> = None;
let mut key_column: Option<String> = None;
let mut candidate_id_prefix: String = String::new();
let mut safe_view: Option<String> = None;
let mut storage_root: PathBuf = PathBuf::from("./data");
let mut limit: Option<usize> = Some(DEFAULT_LIMIT);
let mut concurrency: usize = DEFAULT_CONCURRENCY;
let mut dry_run = false;
let mut i = 1;
while i < argv.len() {
let arg = &argv[i];
let next = || -> Result<String, String> {
argv.get(i + 1).cloned().ok_or_else(|| format!("{arg} needs a value"))
};
match arg.as_str() {
"--source" => { source = Some(PathBuf::from(next()?)); i += 2; }
"--dataset" => { dataset_name = Some(next()?); i += 2; }
"--key-column" => { key_column = Some(next()?); i += 2; }
"--candidate-id-prefix" => { candidate_id_prefix = next()?; i += 2; }
"--safe-view" => { safe_view = Some(next()?); i += 2; }
"--storage-root" => { storage_root = PathBuf::from(next()?); i += 2; }
"--limit" => { limit = Some(next()?.parse().map_err(|e| format!("--limit: {e}"))?); i += 2; }
"--all" => { limit = None; i += 1; }
"--concurrency" => { concurrency = next()?.parse().map_err(|e| format!("--concurrency: {e}"))?; i += 2; }
"--dry-run" => { dry_run = true; i += 1; }
"-h" | "--help" => { print_help(); std::process::exit(0); }
other => return Err(format!("unknown arg: {other}")),
}
}
Ok(Args {
source: source.ok_or("missing --source")?,
dataset_name: dataset_name.ok_or("missing --dataset")?,
key_column: key_column.ok_or("missing --key-column")?,
candidate_id_prefix,
safe_view,
storage_root,
limit,
concurrency,
dry_run,
})
}
fn print_help() {
eprintln!("backfill_subjects — populate subject manifests from a parquet source\n");
eprintln!("Usage: backfill_subjects [flags]\n");
eprintln!("Required:");
eprintln!(" --source PATH parquet file containing subject identifiers");
eprintln!(" --dataset NAME catalogd dataset name (must already exist)");
eprintln!(" --key-column COL column in the parquet that holds subject identifiers\n");
eprintln!("Optional:");
eprintln!(" --candidate-id-prefix STR prefix added to each row's value to form candidate_id");
eprintln!(" (e.g. 'WORKER-' on a worker_id int64 column)");
eprintln!(" --safe-view NAME name of an AiView that safely projects this subject's data");
eprintln!(" --storage-root PATH local-filesystem catalog root (default: ./data)");
eprintln!(" --limit N process at most N rows (default: {DEFAULT_LIMIT})");
eprintln!(" --all process every row in the source");
eprintln!(" --concurrency N max concurrent writes (default: {DEFAULT_CONCURRENCY})");
eprintln!(" --dry-run parse + count + sample, don't write anything");
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
let args = match parse_args() {
Ok(a) => a,
Err(e) => {
eprintln!("error: {e}");
print_help();
std::process::exit(2);
}
};
if let Err(e) = run(args).await {
eprintln!("backfill failed: {e}");
std::process::exit(1);
}
}
async fn run(args: Args) -> Result<(), String> {
eprintln!("[backfill] source: {}", args.source.display());
eprintln!("[backfill] dataset: {}", args.dataset_name);
eprintln!("[backfill] key: {} (prefix: {:?})", args.key_column, args.candidate_id_prefix);
eprintln!("[backfill] limit: {}", args.limit.map(|n| n.to_string()).unwrap_or("all".into()));
eprintln!("[backfill] dry-run: {}", args.dry_run);
// Stand up catalogd over the same local FS the running daemon uses.
let store = backend::init_local(args.storage_root.to_str().ok_or("storage-root not utf-8")?);
let reg = Arc::new(Registry::new(store));
reg.rebuild().await?;
if reg.get_by_name(&args.dataset_name).await.is_none() {
return Err(format!(
"dataset '{}' not found in catalog at {}; backfill needs the dataset registered first",
args.dataset_name, args.storage_root.display(),
));
}
eprintln!("[backfill] dataset '{}' exists in catalog ✓", args.dataset_name);
// Open parquet reader.
let file = File::open(&args.source).map_err(|e| format!("open {}: {e}", args.source.display()))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| e.to_string())?;
let total_rows = builder.metadata().file_metadata().num_rows() as usize;
eprintln!("[backfill] parquet rows: {total_rows}");
// Project only the key column for IO efficiency.
let schema = builder.schema().clone();
let col_idx = schema.index_of(&args.key_column)
.map_err(|_| format!("column '{}' not found; parquet has: {:?}",
args.key_column,
schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
))?;
let projection = parquet::arrow::ProjectionMask::roots(builder.parquet_schema(), [col_idx]);
let mut reader = builder.with_projection(projection).build().map_err(|e| e.to_string())?;
// Collect candidate IDs (limited).
let cap = args.limit.unwrap_or(total_rows);
let mut candidate_ids: Vec<String> = Vec::with_capacity(cap.min(total_rows));
let mut rows_seen = 0usize;
'outer: while let Some(batch_result) = reader.next() {
let batch = batch_result.map_err(|e| format!("batch: {e}"))?;
let column = batch.column(0); // single projected column
for row in 0..batch.num_rows() {
if let Some(limit) = args.limit {
if candidate_ids.len() >= limit { break 'outer; }
}
rows_seen += 1;
let raw = format_cell_value(column, row)?;
candidate_ids.push(format!("{}{}", args.candidate_id_prefix, raw));
}
}
eprintln!("[backfill] collected {} candidate_ids ({} rows scanned)", candidate_ids.len(), rows_seen);
// Sample for the operator.
eprintln!("[backfill] sample (first 5):");
for cid in candidate_ids.iter().take(5) {
eprintln!(" - {cid}");
}
if args.dry_run {
eprintln!("[backfill] DRY-RUN — no writes. Re-run without --dry-run to commit.");
return Ok(());
}
// Write manifests with bounded concurrency.
let now = chrono::Utc::now();
let retention_until = now + chrono::Duration::days(365 * DEFAULT_RETENTION_YEARS);
let safe_views: Vec<String> = args.safe_view.into_iter().collect();
let dataset_ref = SubjectDatasetRef {
name: args.dataset_name.clone(),
key_column: args.key_column.clone(),
key_value: String::new(), // filled per-subject below
};
let attempted = Arc::new(AtomicUsize::new(0));
let inserted = Arc::new(AtomicUsize::new(0));
let skipped = Arc::new(AtomicUsize::new(0));
let failed = Arc::new(AtomicUsize::new(0));
let progress_every = ((candidate_ids.len() / 20).max(1)).min(5_000);
// Bounded concurrency via Semaphore.
let sem = Arc::new(tokio::sync::Semaphore::new(args.concurrency));
let mut handles = Vec::with_capacity(candidate_ids.len());
for (idx, cid) in candidate_ids.into_iter().enumerate() {
let permit = sem.clone().acquire_owned().await.map_err(|e| e.to_string())?;
let reg = reg.clone();
let mut ds_ref = dataset_ref.clone();
ds_ref.key_value = cid.trim_start_matches(&args.candidate_id_prefix).to_string();
let safe_views = safe_views.clone();
let attempted = attempted.clone();
let inserted = inserted.clone();
let skipped = skipped.clone();
let failed = failed.clone();
let handle = tokio::spawn(async move {
let _permit = permit;
attempted.fetch_add(1, Ordering::Relaxed);
// Idempotency: skip if subject already exists.
if reg.get_subject(&cid).await.is_some() {
skipped.fetch_add(1, Ordering::Relaxed);
return;
}
let manifest = SubjectManifest {
schema: "subject_manifest.v1".into(),
candidate_id: cid.clone(),
created_at: now,
updated_at: now,
status: SubjectStatus::Active,
vertical: SubjectVertical::Unknown,
consent: SubjectConsent {
general_pii: GeneralPiiConsent {
status: ConsentStatus::PendingBackfillReview,
version: String::new(),
given_at: None,
withdrawn_at: None,
},
biometric: BiometricConsent {
status: BiometricConsentStatus::NeverCollected,
retention_until: None,
},
},
retention: SubjectRetention {
general_pii_until: retention_until,
policy: "4_year_default".into(),
},
datasets: vec![ds_ref],
safe_views,
audit_log_path: String::new(),
audit_log_chain_root: String::new(),
};
match reg.put_subject(manifest).await {
Ok(_) => { inserted.fetch_add(1, Ordering::Relaxed); }
Err(e) => {
failed.fetch_add(1, Ordering::Relaxed);
eprintln!("{cid}: {e}");
}
}
let cur = attempted.load(Ordering::Relaxed);
if cur % progress_every == 0 {
eprintln!("[backfill] progress: {} attempted ({} inserted, {} skipped, {} failed)",
cur, inserted.load(Ordering::Relaxed),
skipped.load(Ordering::Relaxed),
failed.load(Ordering::Relaxed));
}
let _ = idx;
});
handles.push(handle);
}
for h in handles {
let _ = h.await;
}
eprintln!();
eprintln!("──────────────────────────────────────────────────────────────────");
eprintln!("[backfill] done");
eprintln!("[backfill] attempted: {}", attempted.load(Ordering::Relaxed));
eprintln!("[backfill] inserted: {}", inserted.load(Ordering::Relaxed));
eprintln!("[backfill] skipped: {} (already existed — idempotent re-run)", skipped.load(Ordering::Relaxed));
eprintln!("[backfill] failed: {}", failed.load(Ordering::Relaxed));
eprintln!("[backfill] subjects in catalog now: {}", reg.subjects_count().await);
Ok(())
}
/// Format a single cell value as a string for use in the candidate_id.
/// Supports the integer + string types most likely to be subject keys.
fn format_cell_value(array: &Arc<dyn arrow::array::Array>, row: usize) -> Result<String, String> {
use arrow::array::{Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array};
use arrow::datatypes::DataType;
if array.is_null(row) {
return Err(format!("row {row}: key column is null — cannot form candidate_id"));
}
let dt = array.data_type();
let s = match dt {
DataType::Int64 => array.as_any().downcast_ref::<Int64Array>()
.ok_or("int64 downcast")?.value(row).to_string(),
DataType::Int32 => array.as_any().downcast_ref::<Int32Array>()
.ok_or("int32 downcast")?.value(row).to_string(),
DataType::UInt64 => array.as_any().downcast_ref::<UInt64Array>()
.ok_or("uint64 downcast")?.value(row).to_string(),
DataType::UInt32 => array.as_any().downcast_ref::<UInt32Array>()
.ok_or("uint32 downcast")?.value(row).to_string(),
DataType::Utf8 => array.as_any().downcast_ref::<StringArray>()
.ok_or("utf8 downcast")?.value(row).to_string(),
other => return Err(format!("unsupported key column type: {other:?}")),
};
Ok(s)
}