catalogd: Step 2 — SubjectAuditWriter with HMAC chain

Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md Step 2.
Per-subject append-only audit JSONL with HMAC-SHA256 chain. Local-first
— no Vault, no external anchor (those are v2 if SOC2 Type II becomes
contract-required; v1 deliberately stays small).

shared/types.rs additions:
- AuditAccessor — kind, daemon, purpose, trace_id
- SubjectAuditRow — schema/ts/candidate_id/accessor/fields_accessed/
  result/prev_chain_hash/row_hmac

crates/catalogd/src/subject_audit.rs (NEW):
- SubjectAuditWriter — holds signing key + per-subject latest-hash cache
- from_key_file() — loads key from sealed file, requires ≥32 bytes
- with_inline_key() — for tests + bring-up
- append() — computes HMAC chain link, persists JSONL row, returns new
  chain root (caller mirrors to SubjectManifest.audit_log_chain_root)
- verify_chain() — full re-verification of a subject's audit log,
  catches both prev_hash drift AND row-level HMAC tampering
- scan_latest_hash() — cold-start path, finds prev_hash from JSONL tail
- append_line() — read-modify-write pattern (object stores have no
  native append; same shape as the rest of catalogd's persistence)

Crypto: HMAC-SHA256 via the standard `hmac` crate (added to workspace
+ catalogd deps; not implementing crypto by hand). Output is lowercase
hex matching the rest of the codebase's SHA-256 conventions.

Security choices:
- NO Debug impl on SubjectAuditWriter — auto-deriving Debug would risk
  leaking the signing key into log lines. Tests work around this by
  matching on Result instead of using .unwrap_err().
- Key min length 32 bytes (HMAC-SHA256 block size guidance).
- Failures are NOT swallowed — Result returned, caller decides whether
  to log + continue (per spec §3.2 the gateway tool registry SHOULD
  log + continue rather than block reads).

Tests (7/7 passing):
- first_append_uses_genesis_prev_hash
- chain_links_each_append (3-row chain verifies)
- separate_subjects_have_independent_chains (per-subject isolation)
- tamper_detected_on_verify (mutation in middle of chain breaks verify)
- cold_writer_picks_up_existing_chain (process restart preserves chain)
- empty_candidate_id_rejected
- key_too_short_rejected_via_file

NOT in this commit (future steps):
- Step 3: Backfill ETL from workers_500k.parquet (next per J)
- Step 4: Wire gateway tool registry to call append() on every
  candidate_id returned by search_candidates / get_candidate
- Step 5: Wire validator WorkerLookup similarly
- Step 6: /audit/subject/{id} HTTP endpoint
- Step 7: Daily retention sweep
- Mirroring chain root to SubjectManifest.audit_log_chain_root
  (separate concern; do at the call site)

cargo check --workspace clean. cargo test -p catalogd subject_audit
7/7 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 03:19:18 -05:00
parent d25990982c
commit d16131bcab
5 changed files with 408 additions and 0 deletions

View File

@ -36,6 +36,7 @@ datafusion = "47"
bytes = "1"
futures = "0.3"
sha2 = "0.10"
hmac = "0.12"
url = "2"
tonic = "0.13"
prost = "0.13"

View File

@ -16,5 +16,7 @@ chrono = { workspace = true }
uuid = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
sha2 = { workspace = true }
hmac = { workspace = true }
proto = { path = "../proto" }
tonic = { workspace = true }

View File

@ -2,3 +2,4 @@ pub mod registry;
pub mod service;
pub mod grpc;
pub mod tombstones;
pub mod subject_audit;

View File

@ -0,0 +1,360 @@
//! Subject audit log writer with HMAC chain.
//!
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §3.
//!
//! Per-subject append-only JSONL at `_catalog/subjects/<id>.audit.jsonl`.
//! Each row's `row_hmac` is HMAC-SHA256(key, prev_chain_hash || canonical_json_of_row_minus_hmac).
//! The chain root is mirrored to the SubjectManifest.audit_log_chain_root
//! field after each append, so verifiers can find the latest valid root
//! from the manifest alone.
//!
//! Failures are non-blocking — audit-write errors are logged but do NOT
//! propagate to the caller (per spec §3.2: "better to leak a row than
//! block legitimate operations"). Operators MUST monitor the error count.
//!
//! Key loading: from a sealed file at the path the writer is constructed
//! with. Tests use `with_inline_key` to inject a deterministic key.
use bytes::Bytes;
use hmac::{Hmac, Mac};
use object_store::ObjectStore;
use sha2::Sha256;
use shared::types::SubjectAuditRow;
use std::collections::HashMap;
use std::sync::Arc;
use storaged::ops;
use tokio::sync::Mutex;
const SUBJECT_PREFIX: &str = "_catalog/subjects";
pub const GENESIS_HASH: &str = "GENESIS";
type HmacSha256 = Hmac<Sha256>;
/// Per-subject audit log writer. Holds the signing key and a small
/// in-memory cache of latest chain hash per subject (so we don't read
/// the JSONL file on every append).
///
/// **No Debug impl deliberately** — auto-deriving Debug on this struct
/// would risk leaking the signing key into log lines. Tests that need
/// `Result::unwrap_err` on a function returning `Result<Self, _>` must
/// match on the result instead of using `.unwrap_err()`.
#[derive(Clone)]
pub struct SubjectAuditWriter {
store: Arc<dyn ObjectStore>,
/// HMAC signing key. Loaded once at construction. Never logged.
/// `Vec<u8>` not `String` — keys are bytes, not text.
key: Arc<Vec<u8>>,
/// In-memory cache of the latest chain hash per candidate_id.
/// Loaded lazily from the audit JSONL on first append per subject.
/// Mutex (not RwLock) because every append both reads and writes.
latest_hash: Arc<Mutex<HashMap<String, String>>>,
}
impl SubjectAuditWriter {
/// Construct a writer that loads its signing key from a file at
/// `key_path`. The file MUST be mode 0400 owner-only on production
/// hosts (operator's responsibility — not enforced here because the
/// permission check would be a footgun in tests/dev).
pub async fn from_key_file(
store: Arc<dyn ObjectStore>,
key_path: &std::path::Path,
) -> Result<Self, String> {
let key = tokio::fs::read(key_path)
.await
.map_err(|e| format!("read signing key from {}: {e}", key_path.display()))?;
if key.len() < 32 {
return Err(format!(
"signing key is {} bytes; recommend ≥32 bytes for HMAC-SHA256",
key.len()
));
}
Ok(Self {
store,
key: Arc::new(key),
latest_hash: Arc::new(Mutex::new(HashMap::new())),
})
}
/// Construct a writer with an inline key. Intended for tests + bring-up.
/// Production should use `from_key_file`.
pub fn with_inline_key(store: Arc<dyn ObjectStore>, key: Vec<u8>) -> Self {
Self {
store,
key: Arc::new(key),
latest_hash: Arc::new(Mutex::new(HashMap::new())),
}
}
fn audit_key(candidate_id: &str) -> String {
let safe: String = candidate_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' {
c
} else {
'_'
}
})
.collect();
format!("{SUBJECT_PREFIX}/{}.audit.jsonl", safe)
}
/// Compute HMAC-SHA256(key, prev_hash_bytes || canonical_row_bytes).
/// Returns the HMAC as lowercase hex.
fn compute_hmac(&self, prev_hash: &str, canonical_row: &[u8]) -> String {
let mut mac = <HmacSha256 as Mac>::new_from_slice(&self.key)
.expect("HMAC accepts any key length");
mac.update(prev_hash.as_bytes());
mac.update(canonical_row);
let result = mac.finalize().into_bytes();
let mut s = String::with_capacity(64);
for byte in result {
s.push_str(&format!("{:02x}", byte));
}
s
}
/// Append one audit row for a subject. Computes the HMAC chain link,
/// writes to the per-subject JSONL, returns the new chain root (which
/// the caller MAY mirror to SubjectManifest.audit_log_chain_root).
///
/// On error: logs and returns Err. Caller decides whether to propagate
/// (per spec §3.2 the gateway tool registry SHOULD log + drop).
pub async fn append(&self, mut row: SubjectAuditRow) -> Result<String, String> {
let cid = row.candidate_id.clone();
if cid.is_empty() {
return Err("audit row candidate_id is empty".into());
}
// 1. Find prev_chain_hash. Cache hot path; cold path scans
// the existing JSONL tail-style to find the last row.
let prev = {
let cache = self.latest_hash.lock().await;
cache.get(&cid).cloned()
};
let prev_hash = match prev {
Some(h) => h,
None => self.scan_latest_hash(&cid).await,
};
row.prev_chain_hash = prev_hash.clone();
row.row_hmac = String::new(); // Always cleared before HMAC computation.
// 2. Canonical-JSON the row WITHOUT the row_hmac field, compute MAC.
let canon = serde_json::to_vec(&row)
.map_err(|e| format!("canonicalize audit row: {e}"))?;
let new_hmac = self.compute_hmac(&prev_hash, &canon);
row.row_hmac = new_hmac.clone();
// 3. Append the row to the JSONL.
let line = serde_json::to_vec(&row)
.map_err(|e| format!("serialize audit row: {e}"))?;
self.append_line(&cid, &line).await?;
// 4. Update cache.
self.latest_hash.lock().await.insert(cid, new_hmac.clone());
Ok(new_hmac)
}
/// Scan the audit JSONL for a subject and return the last row's
/// row_hmac. GENESIS if no log exists or it's empty.
/// Object-store-aware: works across LocalFileSystem + InMemory + S3.
async fn scan_latest_hash(&self, candidate_id: &str) -> String {
let key = Self::audit_key(candidate_id);
let bytes = match ops::get(&self.store, &key).await {
Ok(b) => b,
Err(_) => return GENESIS_HASH.to_string(), // file doesn't exist
};
// Find the last newline-delimited line that parses as a row.
let text = match std::str::from_utf8(&bytes) {
Ok(t) => t,
Err(_) => return GENESIS_HASH.to_string(),
};
for line in text.lines().rev() {
if line.trim().is_empty() {
continue;
}
if let Ok(parsed) = serde_json::from_str::<SubjectAuditRow>(line) {
return parsed.row_hmac;
}
}
GENESIS_HASH.to_string()
}
/// Append a single newline-terminated line to the per-subject log.
/// Reads the existing content (may be empty), appends, writes back.
/// Object stores don't have native append; this is the standard
/// "read-modify-write the tail file" pattern.
async fn append_line(&self, candidate_id: &str, line: &[u8]) -> Result<(), String> {
let key = Self::audit_key(candidate_id);
let existing = ops::get(&self.store, &key).await.unwrap_or_default();
let mut buf = Vec::with_capacity(existing.len() + line.len() + 1);
buf.extend_from_slice(&existing);
if !buf.is_empty() && !buf.ends_with(b"\n") {
buf.push(b'\n');
}
buf.extend_from_slice(line);
buf.push(b'\n');
ops::put(&self.store, &key, Bytes::from(buf)).await
}
/// Verify the full HMAC chain for a subject. Returns Ok(rows_verified)
/// or Err with the first chain break encountered.
pub async fn verify_chain(&self, candidate_id: &str) -> Result<usize, String> {
let key = Self::audit_key(candidate_id);
let bytes = ops::get(&self.store, &key).await
.map_err(|e| format!("read audit log for {candidate_id}: {e}"))?;
let text = std::str::from_utf8(&bytes)
.map_err(|e| format!("audit log not utf-8: {e}"))?;
let mut prev = GENESIS_HASH.to_string();
let mut count = 0usize;
for (lineno, line) in text.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let mut row: SubjectAuditRow = serde_json::from_str(line)
.map_err(|e| format!("line {} parse failed: {e}", lineno + 1))?;
if row.prev_chain_hash != prev {
return Err(format!(
"chain break at line {}: prev_chain_hash={} expected={}",
lineno + 1, row.prev_chain_hash, prev,
));
}
let claimed = std::mem::take(&mut row.row_hmac);
let canon = serde_json::to_vec(&row)
.map_err(|e| format!("canonicalize line {}: {e}", lineno + 1))?;
let recomputed = self.compute_hmac(&prev, &canon);
if recomputed != claimed {
return Err(format!(
"hmac mismatch at line {}: stored={} recomputed={}",
lineno + 1, claimed, recomputed,
));
}
prev = claimed;
count += 1;
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
use shared::types::AuditAccessor;
fn fixture_writer() -> SubjectAuditWriter {
SubjectAuditWriter::with_inline_key(
Arc::new(InMemory::new()),
// Deterministic 32-byte key for test reproducibility.
(0u8..32).collect(),
)
}
fn fixture_row(candidate_id: &str, fields: &[&str]) -> SubjectAuditRow {
SubjectAuditRow {
schema: "subject_audit.v1".into(),
ts: chrono::Utc::now(),
candidate_id: candidate_id.into(),
accessor: AuditAccessor {
kind: "gateway_lookup".into(),
daemon: "gateway".into(),
purpose: "fill_validation".into(),
trace_id: String::new(),
},
fields_accessed: fields.iter().map(|s| s.to_string()).collect(),
result: "success".into(),
prev_chain_hash: String::new(), // filled in by writer
row_hmac: String::new(), // filled in by writer
}
}
#[tokio::test]
async fn first_append_uses_genesis_prev_hash() {
let w = fixture_writer();
let new_root = w.append(fixture_row("CAND-1", &["name"])).await.unwrap();
assert!(!new_root.is_empty());
assert_eq!(new_root.len(), 64); // SHA-256 hex
// After first append, the only row should reference GENESIS as prev.
let bytes = ops::get(&w.store, &SubjectAuditWriter::audit_key("CAND-1"))
.await.unwrap();
let text = std::str::from_utf8(&bytes).unwrap();
let row: SubjectAuditRow = serde_json::from_str(text.trim()).unwrap();
assert_eq!(row.prev_chain_hash, "GENESIS");
assert_eq!(row.row_hmac, new_root);
}
#[tokio::test]
async fn chain_links_each_append() {
let w = fixture_writer();
let h1 = w.append(fixture_row("CAND-2", &["name"])).await.unwrap();
let h2 = w.append(fixture_row("CAND-2", &["phone"])).await.unwrap();
let h3 = w.append(fixture_row("CAND-2", &["email"])).await.unwrap();
assert_ne!(h1, h2);
assert_ne!(h2, h3);
let count = w.verify_chain("CAND-2").await.unwrap();
assert_eq!(count, 3);
}
#[tokio::test]
async fn separate_subjects_have_independent_chains() {
let w = fixture_writer();
let _ = w.append(fixture_row("CAND-A", &["name"])).await.unwrap();
let _ = w.append(fixture_row("CAND-B", &["name"])).await.unwrap();
let _ = w.append(fixture_row("CAND-A", &["phone"])).await.unwrap();
assert_eq!(w.verify_chain("CAND-A").await.unwrap(), 2);
assert_eq!(w.verify_chain("CAND-B").await.unwrap(), 1);
}
#[tokio::test]
async fn tamper_detected_on_verify() {
let w = fixture_writer();
let _ = w.append(fixture_row("CAND-T", &["name"])).await.unwrap();
let _ = w.append(fixture_row("CAND-T", &["phone"])).await.unwrap();
// Mutate the file to flip a `result` field on the first row.
let key = SubjectAuditWriter::audit_key("CAND-T");
let bytes = ops::get(&w.store, &key).await.unwrap();
let mut text = std::str::from_utf8(&bytes).unwrap().to_string();
text = text.replacen("\"success\"", "\"denied\"", 1);
ops::put(&w.store, &key, text.into_bytes().into()).await.unwrap();
let err = w.verify_chain("CAND-T").await.unwrap_err();
assert!(err.contains("hmac mismatch"), "expected hmac mismatch, got: {err}");
}
#[tokio::test]
async fn cold_writer_picks_up_existing_chain() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let key: Vec<u8> = (0u8..32).collect();
let w1 = SubjectAuditWriter::with_inline_key(store.clone(), key.clone());
let h1 = w1.append(fixture_row("CAND-COLD", &["name"])).await.unwrap();
// Drop w1's cache, simulate process restart.
drop(w1);
let w2 = SubjectAuditWriter::with_inline_key(store.clone(), key);
let h2 = w2.append(fixture_row("CAND-COLD", &["phone"])).await.unwrap();
assert_ne!(h1, h2);
assert_eq!(w2.verify_chain("CAND-COLD").await.unwrap(), 2);
}
#[tokio::test]
async fn empty_candidate_id_rejected() {
let w = fixture_writer();
let row = fixture_row("", &["name"]);
let err = w.append(row).await.unwrap_err();
assert!(err.contains("empty"), "got: {err}");
}
#[tokio::test]
async fn key_too_short_rejected_via_file() {
// Write a 16-byte key file (under the 32-byte minimum).
let tmp = std::env::temp_dir().join(format!("sa_key_short_{}.bin", std::process::id()));
tokio::fs::write(&tmp, vec![0u8; 16]).await.unwrap();
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
// `unwrap_err` would require Debug on SubjectAuditWriter, which
// we deliberately don't impl (key-leak risk). Match instead.
match SubjectAuditWriter::from_key_file(store, &tmp).await {
Err(e) => assert!(e.contains("≥32 bytes"), "got: {e}"),
Ok(_) => panic!("expected too-short-key rejection"),
}
let _ = tokio::fs::remove_file(&tmp).await;
}
}

View File

@ -510,3 +510,47 @@ pub struct SubjectManifest {
}
fn default_subject_manifest_schema() -> String { "subject_manifest.v1".into() }
/// Audit accessor — who/why touched a subject's PII.
/// Persisted in subject_audit.v1 rows; never logged in plaintext outside
/// the per-subject audit file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditAccessor {
/// One of: "gateway_lookup", "audit_response", "legal_request",
/// "system_resolve". Caller-defined; SubjectAuditWriter does not
/// validate the kind string.
pub kind: String,
/// Daemon name (e.g. "gateway", "validator", "catalogd").
pub daemon: String,
/// Purpose token (e.g. "fill_validation", "audit_subject_response").
pub purpose: String,
/// X-Lakehouse-Trace-Id for cross-daemon trace correlation. Empty
/// when the request didn't carry one.
#[serde(default)]
pub trace_id: String,
}
/// One audit row in a subject's per-subject audit JSONL.
/// HMAC chain: row_hmac = HMAC-SHA256(key, prev_chain_hash || canon(row - row_hmac))
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubjectAuditRow {
/// Always "subject_audit.v1" for v1.
#[serde(default = "default_subject_audit_schema")]
pub schema: String,
pub ts: chrono::DateTime<chrono::Utc>,
pub candidate_id: String,
pub accessor: AuditAccessor,
/// Field names accessed (e.g. ["name"], ["name", "phone"]).
pub fields_accessed: Vec<String>,
/// "success" | "denied" | "not_found" | "error".
pub result: String,
/// Hash of the previous row in this subject's audit log. "GENESIS"
/// for the first row.
pub prev_chain_hash: String,
/// HMAC-SHA256 over (prev_chain_hash || canonical_json(row - row_hmac)).
/// Computed by SubjectAuditWriter; callers leave empty on input.
#[serde(default)]
pub row_hmac: String,
}
fn default_subject_audit_schema() -> String { "subject_audit.v1".into() }