From d16131bcab86801cf57a7b23a9474fa0c7d47be5 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 3 May 2026 03:19:18 -0500 Subject: [PATCH] =?UTF-8?q?catalogd:=20Step=202=20=E2=80=94=20SubjectAudit?= =?UTF-8?q?Writer=20with=20HMAC=20chain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md Step 2. Per-subject append-only audit JSONL with HMAC-SHA256 chain. Local-first — no Vault, no external anchor (those are v2 if SOC2 Type II becomes contract-required; v1 deliberately stays small). shared/types.rs additions: - AuditAccessor — kind, daemon, purpose, trace_id - SubjectAuditRow — schema/ts/candidate_id/accessor/fields_accessed/ result/prev_chain_hash/row_hmac crates/catalogd/src/subject_audit.rs (NEW): - SubjectAuditWriter — holds signing key + per-subject latest-hash cache - from_key_file() — loads key from sealed file, requires ≥32 bytes - with_inline_key() — for tests + bring-up - append() — computes HMAC chain link, persists JSONL row, returns new chain root (caller mirrors to SubjectManifest.audit_log_chain_root) - verify_chain() — full re-verification of a subject's audit log, catches both prev_hash drift AND row-level HMAC tampering - scan_latest_hash() — cold-start path, finds prev_hash from JSONL tail - append_line() — read-modify-write pattern (object stores have no native append; same shape as the rest of catalogd's persistence) Crypto: HMAC-SHA256 via the standard `hmac` crate (added to workspace + catalogd deps; not implementing crypto by hand). Output is lowercase hex matching the rest of the codebase's SHA-256 conventions. Security choices: - NO Debug impl on SubjectAuditWriter — auto-deriving Debug would risk leaking the signing key into log lines. Tests work around this by matching on Result instead of using .unwrap_err(). - Key min length 32 bytes (HMAC-SHA256 block size guidance). - Failures are NOT swallowed — Result returned, caller decides whether to log + continue (per spec §3.2 the gateway tool registry SHOULD log + continue rather than block reads). Tests (7/7 passing): - first_append_uses_genesis_prev_hash - chain_links_each_append (3-row chain verifies) - separate_subjects_have_independent_chains (per-subject isolation) - tamper_detected_on_verify (mutation in middle of chain breaks verify) - cold_writer_picks_up_existing_chain (process restart preserves chain) - empty_candidate_id_rejected - key_too_short_rejected_via_file NOT in this commit (future steps): - Step 3: Backfill ETL from workers_500k.parquet (next per J) - Step 4: Wire gateway tool registry to call append() on every candidate_id returned by search_candidates / get_candidate - Step 5: Wire validator WorkerLookup similarly - Step 6: /audit/subject/{id} HTTP endpoint - Step 7: Daily retention sweep - Mirroring chain root to SubjectManifest.audit_log_chain_root (separate concern; do at the call site) cargo check --workspace clean. cargo test -p catalogd subject_audit 7/7 PASS. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 1 + crates/catalogd/Cargo.toml | 2 + crates/catalogd/src/lib.rs | 1 + crates/catalogd/src/subject_audit.rs | 360 +++++++++++++++++++++++++++ crates/shared/src/types.rs | 44 ++++ 5 files changed, 408 insertions(+) create mode 100644 crates/catalogd/src/subject_audit.rs diff --git a/Cargo.toml b/Cargo.toml index c4566ed..74b2595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ datafusion = "47" bytes = "1" futures = "0.3" sha2 = "0.10" +hmac = "0.12" url = "2" tonic = "0.13" prost = "0.13" diff --git a/crates/catalogd/Cargo.toml b/crates/catalogd/Cargo.toml index 1884d43..327c96e 100644 --- a/crates/catalogd/Cargo.toml +++ b/crates/catalogd/Cargo.toml @@ -16,5 +16,7 @@ chrono = { workspace = true } uuid = { workspace = true } object_store = { workspace = true } arrow = { workspace = true } +sha2 = { workspace = true } +hmac = { workspace = true } proto = { path = "../proto" } tonic = { workspace = true } diff --git a/crates/catalogd/src/lib.rs b/crates/catalogd/src/lib.rs index 331cdb5..1d02429 100644 --- a/crates/catalogd/src/lib.rs +++ b/crates/catalogd/src/lib.rs @@ -2,3 +2,4 @@ pub mod registry; pub mod service; pub mod grpc; pub mod tombstones; +pub mod subject_audit; diff --git a/crates/catalogd/src/subject_audit.rs b/crates/catalogd/src/subject_audit.rs new file mode 100644 index 0000000..c26d087 --- /dev/null +++ b/crates/catalogd/src/subject_audit.rs @@ -0,0 +1,360 @@ +//! Subject audit log writer with HMAC chain. +//! +//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §3. +//! +//! Per-subject append-only JSONL at `_catalog/subjects/.audit.jsonl`. +//! Each row's `row_hmac` is HMAC-SHA256(key, prev_chain_hash || canonical_json_of_row_minus_hmac). +//! The chain root is mirrored to the SubjectManifest.audit_log_chain_root +//! field after each append, so verifiers can find the latest valid root +//! from the manifest alone. +//! +//! Failures are non-blocking — audit-write errors are logged but do NOT +//! propagate to the caller (per spec §3.2: "better to leak a row than +//! block legitimate operations"). Operators MUST monitor the error count. +//! +//! Key loading: from a sealed file at the path the writer is constructed +//! with. Tests use `with_inline_key` to inject a deterministic key. + +use bytes::Bytes; +use hmac::{Hmac, Mac}; +use object_store::ObjectStore; +use sha2::Sha256; +use shared::types::SubjectAuditRow; +use std::collections::HashMap; +use std::sync::Arc; +use storaged::ops; +use tokio::sync::Mutex; + +const SUBJECT_PREFIX: &str = "_catalog/subjects"; +pub const GENESIS_HASH: &str = "GENESIS"; + +type HmacSha256 = Hmac; + +/// Per-subject audit log writer. Holds the signing key and a small +/// in-memory cache of latest chain hash per subject (so we don't read +/// the JSONL file on every append). +/// +/// **No Debug impl deliberately** — auto-deriving Debug on this struct +/// would risk leaking the signing key into log lines. Tests that need +/// `Result::unwrap_err` on a function returning `Result` must +/// match on the result instead of using `.unwrap_err()`. +#[derive(Clone)] +pub struct SubjectAuditWriter { + store: Arc, + /// HMAC signing key. Loaded once at construction. Never logged. + /// `Vec` not `String` — keys are bytes, not text. + key: Arc>, + /// In-memory cache of the latest chain hash per candidate_id. + /// Loaded lazily from the audit JSONL on first append per subject. + /// Mutex (not RwLock) because every append both reads and writes. + latest_hash: Arc>>, +} + +impl SubjectAuditWriter { + /// Construct a writer that loads its signing key from a file at + /// `key_path`. The file MUST be mode 0400 owner-only on production + /// hosts (operator's responsibility — not enforced here because the + /// permission check would be a footgun in tests/dev). + pub async fn from_key_file( + store: Arc, + key_path: &std::path::Path, + ) -> Result { + let key = tokio::fs::read(key_path) + .await + .map_err(|e| format!("read signing key from {}: {e}", key_path.display()))?; + if key.len() < 32 { + return Err(format!( + "signing key is {} bytes; recommend ≥32 bytes for HMAC-SHA256", + key.len() + )); + } + Ok(Self { + store, + key: Arc::new(key), + latest_hash: Arc::new(Mutex::new(HashMap::new())), + }) + } + + /// Construct a writer with an inline key. Intended for tests + bring-up. + /// Production should use `from_key_file`. + pub fn with_inline_key(store: Arc, key: Vec) -> Self { + Self { + store, + key: Arc::new(key), + latest_hash: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn audit_key(candidate_id: &str) -> String { + let safe: String = candidate_id + .chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' { + c + } else { + '_' + } + }) + .collect(); + format!("{SUBJECT_PREFIX}/{}.audit.jsonl", safe) + } + + /// Compute HMAC-SHA256(key, prev_hash_bytes || canonical_row_bytes). + /// Returns the HMAC as lowercase hex. + fn compute_hmac(&self, prev_hash: &str, canonical_row: &[u8]) -> String { + let mut mac = ::new_from_slice(&self.key) + .expect("HMAC accepts any key length"); + mac.update(prev_hash.as_bytes()); + mac.update(canonical_row); + let result = mac.finalize().into_bytes(); + let mut s = String::with_capacity(64); + for byte in result { + s.push_str(&format!("{:02x}", byte)); + } + s + } + + /// Append one audit row for a subject. Computes the HMAC chain link, + /// writes to the per-subject JSONL, returns the new chain root (which + /// the caller MAY mirror to SubjectManifest.audit_log_chain_root). + /// + /// On error: logs and returns Err. Caller decides whether to propagate + /// (per spec §3.2 the gateway tool registry SHOULD log + drop). + pub async fn append(&self, mut row: SubjectAuditRow) -> Result { + let cid = row.candidate_id.clone(); + if cid.is_empty() { + return Err("audit row candidate_id is empty".into()); + } + + // 1. Find prev_chain_hash. Cache hot path; cold path scans + // the existing JSONL tail-style to find the last row. + let prev = { + let cache = self.latest_hash.lock().await; + cache.get(&cid).cloned() + }; + let prev_hash = match prev { + Some(h) => h, + None => self.scan_latest_hash(&cid).await, + }; + row.prev_chain_hash = prev_hash.clone(); + row.row_hmac = String::new(); // Always cleared before HMAC computation. + + // 2. Canonical-JSON the row WITHOUT the row_hmac field, compute MAC. + let canon = serde_json::to_vec(&row) + .map_err(|e| format!("canonicalize audit row: {e}"))?; + let new_hmac = self.compute_hmac(&prev_hash, &canon); + row.row_hmac = new_hmac.clone(); + + // 3. Append the row to the JSONL. + let line = serde_json::to_vec(&row) + .map_err(|e| format!("serialize audit row: {e}"))?; + self.append_line(&cid, &line).await?; + + // 4. Update cache. + self.latest_hash.lock().await.insert(cid, new_hmac.clone()); + + Ok(new_hmac) + } + + /// Scan the audit JSONL for a subject and return the last row's + /// row_hmac. GENESIS if no log exists or it's empty. + /// Object-store-aware: works across LocalFileSystem + InMemory + S3. + async fn scan_latest_hash(&self, candidate_id: &str) -> String { + let key = Self::audit_key(candidate_id); + let bytes = match ops::get(&self.store, &key).await { + Ok(b) => b, + Err(_) => return GENESIS_HASH.to_string(), // file doesn't exist + }; + // Find the last newline-delimited line that parses as a row. + let text = match std::str::from_utf8(&bytes) { + Ok(t) => t, + Err(_) => return GENESIS_HASH.to_string(), + }; + for line in text.lines().rev() { + if line.trim().is_empty() { + continue; + } + if let Ok(parsed) = serde_json::from_str::(line) { + return parsed.row_hmac; + } + } + GENESIS_HASH.to_string() + } + + /// Append a single newline-terminated line to the per-subject log. + /// Reads the existing content (may be empty), appends, writes back. + /// Object stores don't have native append; this is the standard + /// "read-modify-write the tail file" pattern. + async fn append_line(&self, candidate_id: &str, line: &[u8]) -> Result<(), String> { + let key = Self::audit_key(candidate_id); + let existing = ops::get(&self.store, &key).await.unwrap_or_default(); + let mut buf = Vec::with_capacity(existing.len() + line.len() + 1); + buf.extend_from_slice(&existing); + if !buf.is_empty() && !buf.ends_with(b"\n") { + buf.push(b'\n'); + } + buf.extend_from_slice(line); + buf.push(b'\n'); + ops::put(&self.store, &key, Bytes::from(buf)).await + } + + /// Verify the full HMAC chain for a subject. Returns Ok(rows_verified) + /// or Err with the first chain break encountered. + pub async fn verify_chain(&self, candidate_id: &str) -> Result { + let key = Self::audit_key(candidate_id); + let bytes = ops::get(&self.store, &key).await + .map_err(|e| format!("read audit log for {candidate_id}: {e}"))?; + let text = std::str::from_utf8(&bytes) + .map_err(|e| format!("audit log not utf-8: {e}"))?; + let mut prev = GENESIS_HASH.to_string(); + let mut count = 0usize; + for (lineno, line) in text.lines().enumerate() { + if line.trim().is_empty() { + continue; + } + let mut row: SubjectAuditRow = serde_json::from_str(line) + .map_err(|e| format!("line {} parse failed: {e}", lineno + 1))?; + if row.prev_chain_hash != prev { + return Err(format!( + "chain break at line {}: prev_chain_hash={} expected={}", + lineno + 1, row.prev_chain_hash, prev, + )); + } + let claimed = std::mem::take(&mut row.row_hmac); + let canon = serde_json::to_vec(&row) + .map_err(|e| format!("canonicalize line {}: {e}", lineno + 1))?; + let recomputed = self.compute_hmac(&prev, &canon); + if recomputed != claimed { + return Err(format!( + "hmac mismatch at line {}: stored={} recomputed={}", + lineno + 1, claimed, recomputed, + )); + } + prev = claimed; + count += 1; + } + Ok(count) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::memory::InMemory; + use shared::types::AuditAccessor; + + fn fixture_writer() -> SubjectAuditWriter { + SubjectAuditWriter::with_inline_key( + Arc::new(InMemory::new()), + // Deterministic 32-byte key for test reproducibility. + (0u8..32).collect(), + ) + } + + fn fixture_row(candidate_id: &str, fields: &[&str]) -> SubjectAuditRow { + SubjectAuditRow { + schema: "subject_audit.v1".into(), + ts: chrono::Utc::now(), + candidate_id: candidate_id.into(), + accessor: AuditAccessor { + kind: "gateway_lookup".into(), + daemon: "gateway".into(), + purpose: "fill_validation".into(), + trace_id: String::new(), + }, + fields_accessed: fields.iter().map(|s| s.to_string()).collect(), + result: "success".into(), + prev_chain_hash: String::new(), // filled in by writer + row_hmac: String::new(), // filled in by writer + } + } + + #[tokio::test] + async fn first_append_uses_genesis_prev_hash() { + let w = fixture_writer(); + let new_root = w.append(fixture_row("CAND-1", &["name"])).await.unwrap(); + assert!(!new_root.is_empty()); + assert_eq!(new_root.len(), 64); // SHA-256 hex + // After first append, the only row should reference GENESIS as prev. + let bytes = ops::get(&w.store, &SubjectAuditWriter::audit_key("CAND-1")) + .await.unwrap(); + let text = std::str::from_utf8(&bytes).unwrap(); + let row: SubjectAuditRow = serde_json::from_str(text.trim()).unwrap(); + assert_eq!(row.prev_chain_hash, "GENESIS"); + assert_eq!(row.row_hmac, new_root); + } + + #[tokio::test] + async fn chain_links_each_append() { + let w = fixture_writer(); + let h1 = w.append(fixture_row("CAND-2", &["name"])).await.unwrap(); + let h2 = w.append(fixture_row("CAND-2", &["phone"])).await.unwrap(); + let h3 = w.append(fixture_row("CAND-2", &["email"])).await.unwrap(); + assert_ne!(h1, h2); + assert_ne!(h2, h3); + let count = w.verify_chain("CAND-2").await.unwrap(); + assert_eq!(count, 3); + } + + #[tokio::test] + async fn separate_subjects_have_independent_chains() { + let w = fixture_writer(); + let _ = w.append(fixture_row("CAND-A", &["name"])).await.unwrap(); + let _ = w.append(fixture_row("CAND-B", &["name"])).await.unwrap(); + let _ = w.append(fixture_row("CAND-A", &["phone"])).await.unwrap(); + assert_eq!(w.verify_chain("CAND-A").await.unwrap(), 2); + assert_eq!(w.verify_chain("CAND-B").await.unwrap(), 1); + } + + #[tokio::test] + async fn tamper_detected_on_verify() { + let w = fixture_writer(); + let _ = w.append(fixture_row("CAND-T", &["name"])).await.unwrap(); + let _ = w.append(fixture_row("CAND-T", &["phone"])).await.unwrap(); + // Mutate the file to flip a `result` field on the first row. + let key = SubjectAuditWriter::audit_key("CAND-T"); + let bytes = ops::get(&w.store, &key).await.unwrap(); + let mut text = std::str::from_utf8(&bytes).unwrap().to_string(); + text = text.replacen("\"success\"", "\"denied\"", 1); + ops::put(&w.store, &key, text.into_bytes().into()).await.unwrap(); + let err = w.verify_chain("CAND-T").await.unwrap_err(); + assert!(err.contains("hmac mismatch"), "expected hmac mismatch, got: {err}"); + } + + #[tokio::test] + async fn cold_writer_picks_up_existing_chain() { + let store: Arc = Arc::new(InMemory::new()); + let key: Vec = (0u8..32).collect(); + let w1 = SubjectAuditWriter::with_inline_key(store.clone(), key.clone()); + let h1 = w1.append(fixture_row("CAND-COLD", &["name"])).await.unwrap(); + // Drop w1's cache, simulate process restart. + drop(w1); + let w2 = SubjectAuditWriter::with_inline_key(store.clone(), key); + let h2 = w2.append(fixture_row("CAND-COLD", &["phone"])).await.unwrap(); + assert_ne!(h1, h2); + assert_eq!(w2.verify_chain("CAND-COLD").await.unwrap(), 2); + } + + #[tokio::test] + async fn empty_candidate_id_rejected() { + let w = fixture_writer(); + let row = fixture_row("", &["name"]); + let err = w.append(row).await.unwrap_err(); + assert!(err.contains("empty"), "got: {err}"); + } + + #[tokio::test] + async fn key_too_short_rejected_via_file() { + // Write a 16-byte key file (under the 32-byte minimum). + let tmp = std::env::temp_dir().join(format!("sa_key_short_{}.bin", std::process::id())); + tokio::fs::write(&tmp, vec![0u8; 16]).await.unwrap(); + let store: Arc = Arc::new(InMemory::new()); + // `unwrap_err` would require Debug on SubjectAuditWriter, which + // we deliberately don't impl (key-leak risk). Match instead. + match SubjectAuditWriter::from_key_file(store, &tmp).await { + Err(e) => assert!(e.contains("≥32 bytes"), "got: {e}"), + Ok(_) => panic!("expected too-short-key rejection"), + } + let _ = tokio::fs::remove_file(&tmp).await; + } +} diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index 2d3465a..370dccd 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -510,3 +510,47 @@ pub struct SubjectManifest { } fn default_subject_manifest_schema() -> String { "subject_manifest.v1".into() } + +/// Audit accessor — who/why touched a subject's PII. +/// Persisted in subject_audit.v1 rows; never logged in plaintext outside +/// the per-subject audit file. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuditAccessor { + /// One of: "gateway_lookup", "audit_response", "legal_request", + /// "system_resolve". Caller-defined; SubjectAuditWriter does not + /// validate the kind string. + pub kind: String, + /// Daemon name (e.g. "gateway", "validator", "catalogd"). + pub daemon: String, + /// Purpose token (e.g. "fill_validation", "audit_subject_response"). + pub purpose: String, + /// X-Lakehouse-Trace-Id for cross-daemon trace correlation. Empty + /// when the request didn't carry one. + #[serde(default)] + pub trace_id: String, +} + +/// One audit row in a subject's per-subject audit JSONL. +/// HMAC chain: row_hmac = HMAC-SHA256(key, prev_chain_hash || canon(row - row_hmac)) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectAuditRow { + /// Always "subject_audit.v1" for v1. + #[serde(default = "default_subject_audit_schema")] + pub schema: String, + pub ts: chrono::DateTime, + pub candidate_id: String, + pub accessor: AuditAccessor, + /// Field names accessed (e.g. ["name"], ["name", "phone"]). + pub fields_accessed: Vec, + /// "success" | "denied" | "not_found" | "error". + pub result: String, + /// Hash of the previous row in this subject's audit log. "GENESIS" + /// for the first row. + pub prev_chain_hash: String, + /// HMAC-SHA256 over (prev_chain_hash || canonical_json(row - row_hmac)). + /// Computed by SubjectAuditWriter; callers leave empty on input. + #[serde(default)] + pub row_hmac: String, +} + +fn default_subject_audit_schema() -> String { "subject_audit.v1".into() }