gateway: Step 4 — wire SubjectAuditWriter into tool dispatch

Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 4.
Every tool result that returns rows referencing a subject now produces
audit rows in the per-subject HMAC-chained JSONL. Failures non-blocking.

Changes:

v1/mod.rs:
  + V1State.subject_audit: Option<Arc<SubjectAuditWriter>>
    (None when key file missing — audit becomes a no-op with warning,
    PII paths still serve)

main.rs:
  + Construct SubjectAuditWriter at startup from
    LH_SUBJECT_AUDIT_KEY env or /etc/lakehouse/subject_audit.key.
    Missing/short key = log warning + leave None (gateway boots, audit
    disabled). Same store as the rest of catalogd.

execution_loop/mod.rs:
  + audit_subject_hits_in() — called after every successful tool
    dispatch. Walks the result JSON, finds candidate_id / worker_id
    fields, fires one SubjectAuditRow per (subject, fields) pair.
    Tokio::spawn so audit latency never adds to tool path.
  + collect_subject_hits() — free fn, recursive JSON walker. Handles:
      "candidate_id":"X"  → audit candidate_id="X"
      "worker_id":42      → audit candidate_id="WORKER-42" (matches
                              backfill convention)
      "worker_id":"42"    → audit candidate_id="WORKER-42" (string form)
    Other fields in the same object become fields_accessed (so audit
    row records "this access surfaced name + phone for candidate X").
    Ignores objects without id fields. Skips empty id strings. Recurses
    through nested objects + arrays.

Tests (6/6 passing — gateway::collect_subject_hits_*):
  - finds_candidate_id_strings (basic case + fields_accessed extraction)
  - prefixes_worker_id_int (int → WORKER-N)
  - handles_worker_id_string (string → WORKER-N)
  - recurses_through_nested_objects (joins / mixed payloads)
  - ignores_objects_without_id_fields (no false positives)
  - skips_empty_id_strings (defensive)

Per spec §3.2: failures are logged, never propagated. Better to leak
an audit row than block a tool response. Operators monitor warning
volume to detect audit-write regressions.

NOT in this commit (future steps):
  - Step 5: Wire validator WorkerLookup similarly (each candidate_id
    resolved by FillValidator gets an audit row)
  - Step 6: /audit/subject/{id} HTTP endpoint
  - Step 7: Daily retention sweep
  - Mirror chain root to SubjectManifest.audit_log_chain_root after
    each append (currently the chain is verifiable via verify_chain()
    even without the manifest mirror; the mirror is an optimization)
  - Thread X-Lakehouse-Trace-Id from request through to audit row

cargo build --release clean. cargo test -p gateway collect_subject_hits
6/6 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 03:29:24 -05:00
parent bce6dfd1ee
commit fef1efd2ac
3 changed files with 218 additions and 0 deletions

View File

@ -212,6 +212,13 @@ impl ExecutionLoop {
if let Action::ToolCall { tool, args, .. } = &exec_action { if let Action::ToolCall { tool, args, .. } = &exec_action {
match self.dispatch_tool(tool, args).await { match self.dispatch_tool(tool, args).await {
Ok(result) => { Ok(result) => {
// Subject-audit hook (per Phase 1.6 SUBJECT_MANIFESTS spec
// §3.2): every tool result that returns rows referencing a
// subject must produce an audit row. Failures are logged
// but never block the tool result — the spec is explicit:
// "better to leak a row than block legitimate operations."
self.audit_subject_hits_in(&result, tool).await;
let trimmed = trim_result(&result); let trimmed = trim_result(&result);
self.append(LogEntry::new( self.append(LogEntry::new(
turn, "executor", &executor_model, "tool_result", trimmed, turn, "executor", &executor_model, "tool_result", trimmed,
@ -341,6 +348,60 @@ impl ExecutionLoop {
self.log.push(e); self.log.push(e);
} }
/// Subject-audit hook: scan a tool result JSON for subject identifiers,
/// fire one audit row per (subject, fields) pair via the writer in
/// V1State.subject_audit. No-op if writer is None (audit disabled).
///
/// Conventions (matches the backfill binary in catalogd/src/bin/
/// backfill_subjects.rs):
/// - "candidate_id":"X" → audit candidate_id="X"
/// - "worker_id":42 → audit candidate_id="WORKER-42"
///
/// Walks the result JSON recursively. For each object encountered
/// that contains a candidate_id or worker_id field, fires one audit
/// row capturing all the OTHER scalar field names as fields_accessed
/// (so the audit log records "this access surfaced name + phone for
/// candidate X" — sufficient for compliance review).
///
/// Failures are logged but NEVER propagated. Per spec §3.2: an audit
/// gap is preferable to blocking the tool response.
async fn audit_subject_hits_in(&self, result: &serde_json::Value, tool: &str) {
let Some(audit) = self.state.subject_audit.clone() else {
return; // audit disabled — no-op
};
// Collect (candidate_id, fields_accessed) pairs from the JSON.
let mut hits: Vec<(String, Vec<String>)> = Vec::new();
collect_subject_hits(result, &mut hits);
if hits.is_empty() {
return;
}
// Fire one audit row per hit. Spawn so we don't add latency to
// the tool path even if the writer is slow.
for (cid, fields) in hits {
let audit = audit.clone();
let row = shared::types::SubjectAuditRow {
schema: "subject_audit.v1".into(),
ts: chrono::Utc::now(),
candidate_id: cid.clone(),
accessor: shared::types::AuditAccessor {
kind: "gateway_lookup".into(),
daemon: "gateway".into(),
purpose: tool.to_string(),
trace_id: String::new(), // TODO: thread X-Lakehouse-Trace-Id through
},
fields_accessed: fields,
result: "success".into(),
prev_chain_hash: String::new(),
row_hmac: String::new(),
};
tokio::spawn(async move {
if let Err(e) = audit.append(row).await {
tracing::warn!("subject audit write failed for {cid}: {e}");
}
});
}
}
/// Dispatch: model name prefix → provider. /// Dispatch: model name prefix → provider.
/// Local path uses Phase 21 `generate_continuable` (auto-continuation, /// Local path uses Phase 21 `generate_continuable` (auto-continuation,
/// retry on empty thinking-model response). Cloud path hits /// retry on empty thinking-model response). Cloud path hits
@ -984,6 +1045,58 @@ async fn append_outcomes_row_at(
/// PORT FROM orchestrator.ts:306-311. Cap `rows` at 20 entries and /// PORT FROM orchestrator.ts:306-311. Cap `rows` at 20 entries and
/// annotate the truncation so the executor sees it on the next turn /// annotate the truncation so the executor sees it on the next turn
/// Walk a tool-result JSON value recursively. For each object that
/// contains a candidate_id (string) or worker_id (int/string), append
/// (canonical_candidate_id, fields_in_object) to `out`. Conventions
/// match the backfill binary: bare candidate_id stays as-is; worker_id
/// gets prefixed with "WORKER-".
///
/// fields_in_object captures the OTHER scalar field names found in the
/// same object — so the audit row records "this access surfaced
/// {name, phone} for candidate X". The id field itself is NOT included
/// in fields_accessed (it's the subject identifier, not PII data).
pub(crate) fn collect_subject_hits(
v: &serde_json::Value,
out: &mut Vec<(String, Vec<String>)>,
) {
match v {
serde_json::Value::Object(map) => {
// Detect a subject identifier in this object's direct fields.
let mut subject_id: Option<String> = None;
if let Some(cid) = map.get("candidate_id").and_then(|v| v.as_str()) {
if !cid.is_empty() { subject_id = Some(cid.to_string()); }
} else if let Some(wid) = map.get("worker_id") {
if let Some(s) = wid.as_str() {
if !s.is_empty() { subject_id = Some(format!("WORKER-{s}")); }
} else if let Some(n) = wid.as_i64() {
subject_id = Some(format!("WORKER-{n}"));
} else if let Some(n) = wid.as_u64() {
subject_id = Some(format!("WORKER-{n}"));
}
}
if let Some(cid) = subject_id {
let fields: Vec<String> = map
.keys()
.filter(|k| *k != "candidate_id" && *k != "worker_id")
.cloned()
.collect();
out.push((cid, fields));
}
// Recurse — nested objects may carry additional subjects
// (e.g. join results with both candidate + recruiter rows).
for v in map.values() {
collect_subject_hits(v, out);
}
}
serde_json::Value::Array(arr) => {
for v in arr {
collect_subject_hits(v, out);
}
}
_ => {}
}
}
/// prompt — prevents a 1000-row hybrid_search result from wiping the /// prompt — prevents a 1000-row hybrid_search result from wiping the
/// context budget on a single tool call. /// context budget on a single tool call.
fn trim_result(r: &serde_json::Value) -> serde_json::Value { fn trim_result(r: &serde_json::Value) -> serde_json::Value {
@ -1862,4 +1975,79 @@ mod tests {
let p = build_reviewer_prompt(&req, &log); let p = build_reviewer_prompt(&req, &log);
assert!(!p.contains("HARD RULE")); assert!(!p.contains("HARD RULE"));
} }
// --- Subject-audit hook tests ---
#[test]
fn collect_subject_hits_finds_candidate_id_strings() {
let v = serde_json::json!({
"rows": [
{"candidate_id": "CAND-001", "name": "x", "phone": "y"},
{"candidate_id": "CAND-002", "name": "z"},
]
});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 2);
assert_eq!(hits[0].0, "CAND-001");
// Field order from JSON object is preserved in serde_json::Map.
assert!(hits[0].1.contains(&"name".to_string()));
assert!(hits[0].1.contains(&"phone".to_string()));
assert!(!hits[0].1.contains(&"candidate_id".to_string()));
assert_eq!(hits[1].0, "CAND-002");
}
#[test]
fn collect_subject_hits_prefixes_worker_id_int() {
let v = serde_json::json!({"rows": [{"worker_id": 42, "name": "Alice"}]});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].0, "WORKER-42");
assert_eq!(hits[0].1, vec!["name".to_string()]);
}
#[test]
fn collect_subject_hits_handles_worker_id_string() {
let v = serde_json::json!({"worker_id": "42", "name": "Bob"});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].0, "WORKER-42");
}
#[test]
fn collect_subject_hits_recurses_through_nested_objects() {
let v = serde_json::json!({
"outer": {
"candidate_id": "CAND-A",
"linked": {
"worker_id": 7,
"role": "welder"
}
}
});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 2);
let ids: Vec<&str> = hits.iter().map(|(c, _)| c.as_str()).collect();
assert!(ids.contains(&"CAND-A"));
assert!(ids.contains(&"WORKER-7"));
}
#[test]
fn collect_subject_hits_ignores_objects_without_id_fields() {
let v = serde_json::json!({"rows": [{"name": "x"}, {"role": "y"}]});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 0);
}
#[test]
fn collect_subject_hits_skips_empty_id_strings() {
let v = serde_json::json!({"rows": [{"candidate_id": "", "name": "x"}]});
let mut hits = Vec::new();
collect_subject_hits(&v, &mut hits);
assert_eq!(hits.len(), 0);
}
} }

View File

@ -378,6 +378,30 @@ async fn main() {
} }
s s
}, },
// Per-subject audit log writer (HMAC-chained JSONL). Loaded
// from /etc/lakehouse/subject_audit.key when present and
// ≥32 bytes. Missing key = audit disabled (warning, not
// error — gateway still serves PII paths but no audit
// trail). See docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md.
subject_audit: {
let key_path = std::env::var("LH_SUBJECT_AUDIT_KEY")
.unwrap_or_else(|_| "/etc/lakehouse/subject_audit.key".into());
match catalogd::subject_audit::SubjectAuditWriter::from_key_file(
store.clone(), std::path::Path::new(&key_path)
).await {
Ok(w) => {
tracing::info!("v1: subject audit log enabled (key: {})", key_path);
Some(std::sync::Arc::new(w))
}
Err(e) => {
tracing::warn!(
"v1: subject audit log DISABLED — {} (set LH_SUBJECT_AUDIT_KEY or place ≥32-byte key at /etc/lakehouse/subject_audit.key)",
e
);
None
}
}
},
})); }));
// Auth middleware (if enabled) — P5-001 fix 2026-04-23: // Auth middleware (if enabled) — P5-001 fix 2026-04-23:

View File

@ -91,6 +91,12 @@ pub struct V1State {
/// `[validatord].session_log_path` (commit 1a3a82a in /// `[validatord].session_log_path` (commit 1a3a82a in
/// golangLAKEHOUSE). /// golangLAKEHOUSE).
pub session_log: Option<session_log::SessionLogger>, pub session_log: Option<session_log::SessionLogger>,
/// Per-subject audit log writer (HMAC-chained JSONL). Loaded at
/// startup from /etc/lakehouse/subject_audit.key if the file is
/// present and ≥32 bytes. None = audit disabled with a warning
/// at startup; PII access is still served but produces no audit
/// trail. See docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md.
pub subject_audit: Option<std::sync::Arc<catalogd::subject_audit::SubjectAuditWriter>>,
} }
#[derive(Default, Clone, Serialize)] #[derive(Default, Clone, Serialize)]