From fef1efd2ac4907daf84da0b95b8ff043c15bbd55 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 3 May 2026 03:29:24 -0500 Subject: [PATCH] =?UTF-8?q?gateway:=20Step=204=20=E2=80=94=20wire=20Subjec?= =?UTF-8?q?tAuditWriter=20into=20tool=20dispatch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 4. Every tool result that returns rows referencing a subject now produces audit rows in the per-subject HMAC-chained JSONL. Failures non-blocking. Changes: v1/mod.rs: + V1State.subject_audit: Option> (None when key file missing — audit becomes a no-op with warning, PII paths still serve) main.rs: + Construct SubjectAuditWriter at startup from LH_SUBJECT_AUDIT_KEY env or /etc/lakehouse/subject_audit.key. Missing/short key = log warning + leave None (gateway boots, audit disabled). Same store as the rest of catalogd. execution_loop/mod.rs: + audit_subject_hits_in() — called after every successful tool dispatch. Walks the result JSON, finds candidate_id / worker_id fields, fires one SubjectAuditRow per (subject, fields) pair. Tokio::spawn so audit latency never adds to tool path. + collect_subject_hits() — free fn, recursive JSON walker. Handles: "candidate_id":"X" → audit candidate_id="X" "worker_id":42 → audit candidate_id="WORKER-42" (matches backfill convention) "worker_id":"42" → audit candidate_id="WORKER-42" (string form) Other fields in the same object become fields_accessed (so audit row records "this access surfaced name + phone for candidate X"). Ignores objects without id fields. Skips empty id strings. Recurses through nested objects + arrays. Tests (6/6 passing — gateway::collect_subject_hits_*): - finds_candidate_id_strings (basic case + fields_accessed extraction) - prefixes_worker_id_int (int → WORKER-N) - handles_worker_id_string (string → WORKER-N) - recurses_through_nested_objects (joins / mixed payloads) - ignores_objects_without_id_fields (no false positives) - skips_empty_id_strings (defensive) Per spec §3.2: failures are logged, never propagated. Better to leak an audit row than block a tool response. Operators monitor warning volume to detect audit-write regressions. NOT in this commit (future steps): - Step 5: Wire validator WorkerLookup similarly (each candidate_id resolved by FillValidator gets an audit row) - Step 6: /audit/subject/{id} HTTP endpoint - Step 7: Daily retention sweep - Mirror chain root to SubjectManifest.audit_log_chain_root after each append (currently the chain is verifiable via verify_chain() even without the manifest mirror; the mirror is an optimization) - Thread X-Lakehouse-Trace-Id from request through to audit row cargo build --release clean. cargo test -p gateway collect_subject_hits 6/6 PASS. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/gateway/src/execution_loop/mod.rs | 188 +++++++++++++++++++++++ crates/gateway/src/main.rs | 24 +++ crates/gateway/src/v1/mod.rs | 6 + 3 files changed, 218 insertions(+) diff --git a/crates/gateway/src/execution_loop/mod.rs b/crates/gateway/src/execution_loop/mod.rs index 53d0007..7e36830 100644 --- a/crates/gateway/src/execution_loop/mod.rs +++ b/crates/gateway/src/execution_loop/mod.rs @@ -212,6 +212,13 @@ impl ExecutionLoop { if let Action::ToolCall { tool, args, .. } = &exec_action { match self.dispatch_tool(tool, args).await { Ok(result) => { + // Subject-audit hook (per Phase 1.6 SUBJECT_MANIFESTS spec + // §3.2): every tool result that returns rows referencing a + // subject must produce an audit row. Failures are logged + // but never block the tool result — the spec is explicit: + // "better to leak a row than block legitimate operations." + self.audit_subject_hits_in(&result, tool).await; + let trimmed = trim_result(&result); self.append(LogEntry::new( turn, "executor", &executor_model, "tool_result", trimmed, @@ -341,6 +348,60 @@ impl ExecutionLoop { self.log.push(e); } + /// Subject-audit hook: scan a tool result JSON for subject identifiers, + /// fire one audit row per (subject, fields) pair via the writer in + /// V1State.subject_audit. No-op if writer is None (audit disabled). + /// + /// Conventions (matches the backfill binary in catalogd/src/bin/ + /// backfill_subjects.rs): + /// - "candidate_id":"X" → audit candidate_id="X" + /// - "worker_id":42 → audit candidate_id="WORKER-42" + /// + /// Walks the result JSON recursively. For each object encountered + /// that contains a candidate_id or worker_id field, fires one audit + /// row capturing all the OTHER scalar field names as fields_accessed + /// (so the audit log records "this access surfaced name + phone for + /// candidate X" — sufficient for compliance review). + /// + /// Failures are logged but NEVER propagated. Per spec §3.2: an audit + /// gap is preferable to blocking the tool response. + async fn audit_subject_hits_in(&self, result: &serde_json::Value, tool: &str) { + let Some(audit) = self.state.subject_audit.clone() else { + return; // audit disabled — no-op + }; + // Collect (candidate_id, fields_accessed) pairs from the JSON. + let mut hits: Vec<(String, Vec)> = Vec::new(); + collect_subject_hits(result, &mut hits); + if hits.is_empty() { + return; + } + // Fire one audit row per hit. Spawn so we don't add latency to + // the tool path even if the writer is slow. + for (cid, fields) in hits { + let audit = audit.clone(); + let row = shared::types::SubjectAuditRow { + schema: "subject_audit.v1".into(), + ts: chrono::Utc::now(), + candidate_id: cid.clone(), + accessor: shared::types::AuditAccessor { + kind: "gateway_lookup".into(), + daemon: "gateway".into(), + purpose: tool.to_string(), + trace_id: String::new(), // TODO: thread X-Lakehouse-Trace-Id through + }, + fields_accessed: fields, + result: "success".into(), + prev_chain_hash: String::new(), + row_hmac: String::new(), + }; + tokio::spawn(async move { + if let Err(e) = audit.append(row).await { + tracing::warn!("subject audit write failed for {cid}: {e}"); + } + }); + } + } + /// Dispatch: model name prefix → provider. /// Local path uses Phase 21 `generate_continuable` (auto-continuation, /// retry on empty thinking-model response). Cloud path hits @@ -984,6 +1045,58 @@ async fn append_outcomes_row_at( /// PORT FROM orchestrator.ts:306-311. Cap `rows` at 20 entries and /// annotate the truncation so the executor sees it on the next turn +/// Walk a tool-result JSON value recursively. For each object that +/// contains a candidate_id (string) or worker_id (int/string), append +/// (canonical_candidate_id, fields_in_object) to `out`. Conventions +/// match the backfill binary: bare candidate_id stays as-is; worker_id +/// gets prefixed with "WORKER-". +/// +/// fields_in_object captures the OTHER scalar field names found in the +/// same object — so the audit row records "this access surfaced +/// {name, phone} for candidate X". The id field itself is NOT included +/// in fields_accessed (it's the subject identifier, not PII data). +pub(crate) fn collect_subject_hits( + v: &serde_json::Value, + out: &mut Vec<(String, Vec)>, +) { + match v { + serde_json::Value::Object(map) => { + // Detect a subject identifier in this object's direct fields. + let mut subject_id: Option = None; + if let Some(cid) = map.get("candidate_id").and_then(|v| v.as_str()) { + if !cid.is_empty() { subject_id = Some(cid.to_string()); } + } else if let Some(wid) = map.get("worker_id") { + if let Some(s) = wid.as_str() { + if !s.is_empty() { subject_id = Some(format!("WORKER-{s}")); } + } else if let Some(n) = wid.as_i64() { + subject_id = Some(format!("WORKER-{n}")); + } else if let Some(n) = wid.as_u64() { + subject_id = Some(format!("WORKER-{n}")); + } + } + if let Some(cid) = subject_id { + let fields: Vec = map + .keys() + .filter(|k| *k != "candidate_id" && *k != "worker_id") + .cloned() + .collect(); + out.push((cid, fields)); + } + // Recurse — nested objects may carry additional subjects + // (e.g. join results with both candidate + recruiter rows). + for v in map.values() { + collect_subject_hits(v, out); + } + } + serde_json::Value::Array(arr) => { + for v in arr { + collect_subject_hits(v, out); + } + } + _ => {} + } +} + /// prompt — prevents a 1000-row hybrid_search result from wiping the /// context budget on a single tool call. fn trim_result(r: &serde_json::Value) -> serde_json::Value { @@ -1862,4 +1975,79 @@ mod tests { let p = build_reviewer_prompt(&req, &log); assert!(!p.contains("HARD RULE")); } + + // --- Subject-audit hook tests --- + + #[test] + fn collect_subject_hits_finds_candidate_id_strings() { + let v = serde_json::json!({ + "rows": [ + {"candidate_id": "CAND-001", "name": "x", "phone": "y"}, + {"candidate_id": "CAND-002", "name": "z"}, + ] + }); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 2); + assert_eq!(hits[0].0, "CAND-001"); + // Field order from JSON object is preserved in serde_json::Map. + assert!(hits[0].1.contains(&"name".to_string())); + assert!(hits[0].1.contains(&"phone".to_string())); + assert!(!hits[0].1.contains(&"candidate_id".to_string())); + assert_eq!(hits[1].0, "CAND-002"); + } + + #[test] + fn collect_subject_hits_prefixes_worker_id_int() { + let v = serde_json::json!({"rows": [{"worker_id": 42, "name": "Alice"}]}); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 1); + assert_eq!(hits[0].0, "WORKER-42"); + assert_eq!(hits[0].1, vec!["name".to_string()]); + } + + #[test] + fn collect_subject_hits_handles_worker_id_string() { + let v = serde_json::json!({"worker_id": "42", "name": "Bob"}); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 1); + assert_eq!(hits[0].0, "WORKER-42"); + } + + #[test] + fn collect_subject_hits_recurses_through_nested_objects() { + let v = serde_json::json!({ + "outer": { + "candidate_id": "CAND-A", + "linked": { + "worker_id": 7, + "role": "welder" + } + } + }); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 2); + let ids: Vec<&str> = hits.iter().map(|(c, _)| c.as_str()).collect(); + assert!(ids.contains(&"CAND-A")); + assert!(ids.contains(&"WORKER-7")); + } + + #[test] + fn collect_subject_hits_ignores_objects_without_id_fields() { + let v = serde_json::json!({"rows": [{"name": "x"}, {"role": "y"}]}); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 0); + } + + #[test] + fn collect_subject_hits_skips_empty_id_strings() { + let v = serde_json::json!({"rows": [{"candidate_id": "", "name": "x"}]}); + let mut hits = Vec::new(); + collect_subject_hits(&v, &mut hits); + assert_eq!(hits.len(), 0); + } } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index ff923d0..5d15d9d 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -378,6 +378,30 @@ async fn main() { } s }, + // Per-subject audit log writer (HMAC-chained JSONL). Loaded + // from /etc/lakehouse/subject_audit.key when present and + // ≥32 bytes. Missing key = audit disabled (warning, not + // error — gateway still serves PII paths but no audit + // trail). See docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md. + subject_audit: { + let key_path = std::env::var("LH_SUBJECT_AUDIT_KEY") + .unwrap_or_else(|_| "/etc/lakehouse/subject_audit.key".into()); + match catalogd::subject_audit::SubjectAuditWriter::from_key_file( + store.clone(), std::path::Path::new(&key_path) + ).await { + Ok(w) => { + tracing::info!("v1: subject audit log enabled (key: {})", key_path); + Some(std::sync::Arc::new(w)) + } + Err(e) => { + tracing::warn!( + "v1: subject audit log DISABLED — {} (set LH_SUBJECT_AUDIT_KEY or place ≥32-byte key at /etc/lakehouse/subject_audit.key)", + e + ); + None + } + } + }, })); // Auth middleware (if enabled) — P5-001 fix 2026-04-23: diff --git a/crates/gateway/src/v1/mod.rs b/crates/gateway/src/v1/mod.rs index 6ab9ef0..107cd75 100644 --- a/crates/gateway/src/v1/mod.rs +++ b/crates/gateway/src/v1/mod.rs @@ -91,6 +91,12 @@ pub struct V1State { /// `[validatord].session_log_path` (commit 1a3a82a in /// golangLAKEHOUSE). pub session_log: Option, + /// Per-subject audit log writer (HMAC-chained JSONL). Loaded at + /// startup from /etc/lakehouse/subject_audit.key if the file is + /// present and ≥32 bytes. None = audit disabled with a warning + /// at startup; PII access is still served but produces no audit + /// trail. See docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md. + pub subject_audit: Option>, } #[derive(Default, Clone, Serialize)]