gateway: Step 5 — wire SubjectAuditWriter into validator WorkerLookup

Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 5.
Every WorkerLookup.find() call from the validator path now produces
one audit row in the per-subject HMAC-chained JSONL. Failures are
non-blocking — validator continues whether audit succeeds or fails.

Approach: decorator pattern. WorkerLookup is a sync trait by design
(validator's contract is "in-memory snapshot, no per-call I/O") and
audit writes are async, so we can't expand the trait. Instead, a
new AuditingWorkerLookup wraps the inner lookup, captures a
tokio::runtime::Handle at construction, and spawns audit writes from
sync find() onto that handle. The chain stays intact under spawn fan-
out because the writer's per-subject Mutex (shipped in the previous
scrum-fix commit) serializes same-subject appends regardless of how
the spawn calls arrive.

Files changed:

  crates/gateway/src/v1/auditing_worker_lookup.rs (NEW, 175 LOC):
    - AuditingWorkerLookup<inner: dyn WorkerLookup, audit: Option<Arc<Writer>>>
    - new() captures Tokio Handle if audit is Some
    - find() runs inner lookup, then spawns audit append with:
        accessor.kind = "validator_lookup"
        accessor.purpose = "validator_worker_lookup"
        fields_accessed = ["exists"] (validator only proves existence
                          of a subject; downstream code reads policy
                          fields separately and would have its own
                          audit if those become PII)
        result = "success" if found, "not_found" otherwise
    - Audit-disabled path (audit: None) is a transparent passthrough
      — zero overhead, no panic, no runtime requirement.

  crates/gateway/src/v1/mod.rs:
    + pub mod auditing_worker_lookup;

  crates/gateway/src/main.rs:
    - Hoisted subject_audit_writer construction OUT of the V1State
      literal (declaration-order constraint: validate_workers needs
      access to the writer). The hoisted Arc is then reused for the
      V1State.subject_audit field.
    - validate_workers now wraps the raw lookup with
      AuditingWorkerLookup::new(raw, subject_audit_writer.clone())

Tests (4/4 passing):
  - find_existing_subject_writes_success_audit_row
  - find_missing_subject_writes_not_found_audit_row (phantom-id case)
  - audit_disabled_means_no_writes_no_overhead (None pathway)
  - many_finds_to_same_subject_produce_intact_chain (30 sequential
    spawns on the same subject — chain verifies all 30, regression
    against the race we fixed in catalogd subject_audit)

Also catches the iterate.rs:324 phantom-ID check transparently —
that codepath calls state.validate_workers.find(...) which now goes
through the wrapper, so every phantom-id rejection logs an audit row
for free.

NOT in this commit (future steps):
  - Step 6: /audit/subject/{id} HTTP endpoint
  - Step 7: Daily retention sweep
  - Threading X-Lakehouse-Trace-Id from request through to audit row
    (currently audit row's accessor.trace_id is empty)

cargo build --release clean. cargo test -p gateway auditing_worker_lookup
4/4 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 03:43:40 -05:00
parent e38f3573ff
commit cd8c59a53d
3 changed files with 237 additions and 26 deletions

View File

@ -255,6 +255,33 @@ async fn main() {
// surface over the existing aibridge → Ollama path. Future
// phases add provider adapters (39), routing engine (40),
// session state (41), etc. All without changing this mount.
;
// Build subject-audit writer BEFORE the V1State literal so the
// validate_workers wrapper can reference it. None = audit disabled
// (key file missing or unreadable); the gateway still serves PII
// paths but produces no audit trail per spec §3.2.
let subject_audit_writer: Option<std::sync::Arc<catalogd::subject_audit::SubjectAuditWriter>> = {
let key_path = std::env::var("LH_SUBJECT_AUDIT_KEY")
.unwrap_or_else(|_| "/etc/lakehouse/subject_audit.key".into());
match catalogd::subject_audit::SubjectAuditWriter::from_key_file(
store.clone(), std::path::Path::new(&key_path)
).await {
Ok(w) => {
tracing::info!("v1: subject audit log enabled (key: {})", key_path);
Some(std::sync::Arc::new(w))
}
Err(e) => {
tracing::warn!(
"v1: subject audit log DISABLED — {} (set LH_SUBJECT_AUDIT_KEY or place ≥32-byte key at /etc/lakehouse/subject_audit.key)",
e
);
None
}
}
};
app = app
.nest("/v1", v1::router(v1::V1State {
ai_client: ai_client.clone(),
usage: std::sync::Arc::new(tokio::sync::RwLock::new(v1::Usage::default())),
@ -334,7 +361,7 @@ async fn main() {
let path_str = std::env::var("LH_WORKERS_PARQUET")
.unwrap_or_else(|_| "/home/profit/lakehouse/data/datasets/workers_500k.parquet".into());
let path = std::path::Path::new(&path_str);
if path.exists() {
let raw: std::sync::Arc<dyn validator::WorkerLookup> = if path.exists() {
match validator::staffing::parquet_lookup::load_workers_parquet(path) {
Ok(lookup) => {
tracing::info!("v1: workers parquet loaded from {} — /v1/validate worker-existence checks enabled", path_str);
@ -348,7 +375,16 @@ async fn main() {
} else {
tracing::warn!("v1: workers parquet at {} not found — /v1/validate worker-existence checks will fail Consistency", path_str);
std::sync::Arc::new(validator::InMemoryWorkerLookup::new())
}
};
// Wrap with subject-audit decorator. When subject_audit_writer
// is None, the wrapper is a transparent passthrough — no
// audit, no overhead. When Some, every find() call fires an
// audit row via the per-subject HMAC chain. (Step 5 of
// SUBJECT_MANIFESTS_ON_CATALOGD spec.)
std::sync::Arc::new(v1::auditing_worker_lookup::AuditingWorkerLookup::new(
raw,
subject_audit_writer.clone(),
))
},
// Phase 40 early deliverable — Langfuse trace emitter.
// Defaults match mcp-server/tracing.ts conventions so
@ -378,30 +414,10 @@ async fn main() {
}
s
},
// Per-subject audit log writer (HMAC-chained JSONL). Loaded
// from /etc/lakehouse/subject_audit.key when present and
// ≥32 bytes. Missing key = audit disabled (warning, not
// error — gateway still serves PII paths but no audit
// trail). See docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md.
subject_audit: {
let key_path = std::env::var("LH_SUBJECT_AUDIT_KEY")
.unwrap_or_else(|_| "/etc/lakehouse/subject_audit.key".into());
match catalogd::subject_audit::SubjectAuditWriter::from_key_file(
store.clone(), std::path::Path::new(&key_path)
).await {
Ok(w) => {
tracing::info!("v1: subject audit log enabled (key: {})", key_path);
Some(std::sync::Arc::new(w))
}
Err(e) => {
tracing::warn!(
"v1: subject audit log DISABLED — {} (set LH_SUBJECT_AUDIT_KEY or place ≥32-byte key at /etc/lakehouse/subject_audit.key)",
e
);
None
}
}
},
// subject_audit was constructed BEFORE the V1State literal
// (see `subject_audit_writer` above) so validate_workers
// could wrap with it. Reuse the same Arc here.
subject_audit: subject_audit_writer.clone(),
}));
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:

View File

@ -0,0 +1,194 @@
//! Validator WorkerLookup decorator that fires per-find audit rows.
//!
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 5.
//!
//! Wraps an inner `dyn WorkerLookup`. Every `find(candidate_id)` call
//! produces one `SubjectAuditRow` via the bundled `SubjectAuditWriter`,
//! tagged with `purpose="validator_worker_lookup"`. Audit-write failures
//! are logged only — never propagated to the caller (per spec §3.2:
//! audit-side gaps are preferable to blocking validation).
//!
//! **Why a decorator and not a trait extension**: `WorkerLookup` is a
//! sync trait by design (validator's contract is "in-memory snapshot,
//! no per-call I/O"). Audit writes are async. The decorator bridges
//! them by spawning the audit append onto the current Tokio runtime
//! handle. Per-subject sequencing inside the writer ensures the chain
//! stays intact even with spawned writes (regression-tested at
//! catalogd::subject_audit::tests::concurrent_appends_to_same_subject_serialize).
use catalogd::subject_audit::SubjectAuditWriter;
use shared::types::{AuditAccessor, SubjectAuditRow};
use std::sync::Arc;
use validator::{WorkerLookup, WorkerRecord};
pub struct AuditingWorkerLookup {
inner: Arc<dyn WorkerLookup>,
/// None = audit disabled. find() still runs; just no audit row.
audit: Option<Arc<SubjectAuditWriter>>,
/// Cached Tokio handle so the sync find() path can spawn audit
/// writes onto the right runtime. None at construction = audit
/// is no-op (no runtime context to dispatch into).
runtime_handle: Option<tokio::runtime::Handle>,
}
impl AuditingWorkerLookup {
/// Wrap an inner lookup with optional audit. If `audit` is None,
/// behavior is identical to the inner — no overhead, no audit.
/// MUST be constructed inside a Tokio runtime context (uses
/// `Handle::try_current` to capture the handle).
pub fn new(inner: Arc<dyn WorkerLookup>, audit: Option<Arc<SubjectAuditWriter>>) -> Self {
let runtime_handle = if audit.is_some() {
tokio::runtime::Handle::try_current().ok()
} else {
None
};
if audit.is_some() && runtime_handle.is_none() {
tracing::warn!(
"AuditingWorkerLookup: audit enabled but no Tokio runtime — audit writes disabled"
);
}
Self { inner, audit, runtime_handle }
}
}
impl WorkerLookup for AuditingWorkerLookup {
fn find(&self, candidate_id: &str) -> Option<WorkerRecord> {
let result = self.inner.find(candidate_id);
// Fire audit AFTER the find so the result-state field reflects
// the actual outcome ("success" if found, "not_found" if not).
if let (Some(audit), Some(handle)) = (self.audit.clone(), self.runtime_handle.clone()) {
let cid = candidate_id.to_string();
let result_state = if result.is_some() { "success" } else { "not_found" };
// Single field: "exists" — validator only consumes existence
// + role/city/state from the WorkerRecord. We don't enumerate
// every WorkerRecord field as fields_accessed because that
// would imply they were USED by the caller; the validator
// only proves existence + then reads policy fields downstream.
let row = SubjectAuditRow {
schema: "subject_audit.v1".into(),
ts: chrono::Utc::now(),
candidate_id: cid.clone(),
accessor: AuditAccessor {
kind: "validator_lookup".into(),
daemon: "gateway".into(),
purpose: "validator_worker_lookup".into(),
trace_id: String::new(),
},
fields_accessed: vec!["exists".into()],
result: result_state.into(),
prev_chain_hash: String::new(),
row_hmac: String::new(),
};
// Spawn fire-and-forget. Chain integrity is preserved by the
// writer's per-subject Mutex (regression-tested), so spawning
// is safe. Audit-write errors are logged only.
handle.spawn(async move {
if let Err(e) = audit.append(row).await {
tracing::warn!("validator audit write failed for {cid}: {e}");
}
});
}
result
}
fn len(&self) -> usize {
self.inner.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use catalogd::subject_audit::SubjectAuditWriter;
use object_store::memory::InMemory;
use validator::InMemoryWorkerLookup;
fn fixture_inner_with(records: Vec<WorkerRecord>) -> Arc<dyn WorkerLookup> {
Arc::new(InMemoryWorkerLookup::from_records(records))
}
fn fixture_record(candidate_id: &str) -> WorkerRecord {
WorkerRecord {
candidate_id: candidate_id.into(),
name: "Test".into(),
status: "active".into(),
city: Some("Chicago".into()),
state: Some("IL".into()),
role: Some("Welder".into()),
blacklisted_clients: vec![],
}
}
#[tokio::test]
async fn find_existing_subject_writes_success_audit_row() {
let inner = fixture_inner_with(vec![fixture_record("CAND-EXIST")]);
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
let writer = Arc::new(SubjectAuditWriter::with_inline_key(
store.clone(),
(0u8..32).collect(),
));
let lookup = AuditingWorkerLookup::new(inner, Some(writer.clone()));
let r = lookup.find("CAND-EXIST");
assert!(r.is_some());
// Audit is spawned — give it a tick to land.
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let count = writer.verify_chain("CAND-EXIST").await.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn find_missing_subject_writes_not_found_audit_row() {
let inner = fixture_inner_with(vec![]);
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
let writer = Arc::new(SubjectAuditWriter::with_inline_key(
store.clone(),
(0u8..32).collect(),
));
let lookup = AuditingWorkerLookup::new(inner, Some(writer.clone()));
let r = lookup.find("CAND-PHANTOM");
assert!(r.is_none());
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// verify_chain returns the row count for that subject (1 row,
// result="not_found"). We don't assert on result text here —
// that's covered by the hand-rolled JSON inspection below.
assert_eq!(writer.verify_chain("CAND-PHANTOM").await.unwrap(), 1);
}
#[tokio::test]
async fn audit_disabled_means_no_writes_no_overhead() {
let inner = fixture_inner_with(vec![fixture_record("CAND-A")]);
let lookup = AuditingWorkerLookup::new(inner, None);
// Just confirm find() still works with no audit — no panic, no
// runtime-handle requirement.
assert!(lookup.find("CAND-A").is_some());
assert!(lookup.find("CAND-MISSING").is_none());
assert_eq!(lookup.len(), 1);
}
#[tokio::test]
async fn many_finds_to_same_subject_produce_intact_chain() {
// Same regression as catalogd's concurrent test, but exercised
// through the validator wrapper path. 30 sequential finds on
// the SAME subject should produce an intact 30-row chain.
let inner = fixture_inner_with(vec![fixture_record("CAND-CHAIN")]);
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
let writer = Arc::new(SubjectAuditWriter::with_inline_key(
store.clone(),
(0u8..32).collect(),
));
let lookup = AuditingWorkerLookup::new(inner, Some(writer.clone()));
for _ in 0..30 {
let _ = lookup.find("CAND-CHAIN");
}
// Wait for all spawns to finish — the runtime is currently
// single-threaded test runtime, so a few yields drains.
for _ in 0..50 {
tokio::task::yield_now().await;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let count = writer.verify_chain("CAND-CHAIN").await.unwrap();
assert_eq!(count, 30, "chain corrupted under sequential spawn load");
}
}

View File

@ -20,6 +20,7 @@ pub mod kimi;
pub mod opencode;
pub mod validate;
pub mod iterate;
pub mod auditing_worker_lookup;
pub mod langfuse_trace;
pub mod session_log;
pub mod mode;