diff --git a/crates/catalogd/src/audit_endpoint.rs b/crates/catalogd/src/audit_endpoint.rs new file mode 100644 index 0000000..9fc9324 --- /dev/null +++ b/crates/catalogd/src/audit_endpoint.rs @@ -0,0 +1,481 @@ +//! `/audit/subject/{id}` HTTP endpoint — legal-tier subject audit response. +//! +//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §4 + §6. +//! +//! Responds to: `GET /subject/{candidate_id}?from=ISO&to=ISO` +//! Auth: requires `X-Lakehouse-Legal-Token` header matching the value in +//! the legal-token file (see LegalAuditAuth). Constant-time compare. +//! +//! Response shape (subject_audit_response.v1): +//! - manifest: full SubjectManifest from catalogd +//! - audit_log.rows: SubjectAuditRow entries within [from, to] +//! - audit_log.chain_verified: bool (HMAC chain verifies end-to-end) +//! - completeness_attestation: text the lawyer can quote +//! +//! V1 deliberately does NOT include: +//! - Ed25519 signature on the response — chain verification IS the +//! v1 attestation; signing is future hardening (spec §10 v2 reserve) +//! - Live row projection from the underlying datasets — caller can +//! fetch via /query/sql against the safe_views named in the manifest; +//! keeping projection out of this endpoint avoids depending on queryd +//! - External anchor / S3 Object Lock — local HMAC chain is the v1 +//! integrity surface +//! +//! These are documented as future hardening, not silent gaps. + +use crate::registry::Registry; +use crate::subject_audit::SubjectAuditWriter; +use axum::{ + extract::{Path, Query, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + routing::get, + Json, Router, +}; +use serde::{Deserialize, Serialize}; +use shared::types::{SubjectAuditRow, SubjectManifest}; +use std::sync::Arc; + +const LEGAL_TOKEN_HEADER: &str = "x-lakehouse-legal-token"; +const RESPONSE_SCHEMA: &str = "subject_audit_response.v1"; + +/// State for the audit endpoint router. Held outside Registry because +/// the audit writer + legal token aren't part of catalogd's data model +/// — they're sidecar concerns specific to this endpoint. +#[derive(Clone)] +pub struct AuditEndpointState { + pub registry: Registry, + pub writer: Arc, + /// Legal-tier token. Constant-time compared against the header on + /// every request. None = endpoint returns 503 (audit unavailable). + pub legal_token: Option>, +} + +impl AuditEndpointState { + /// Construct from existing pieces. `legal_token_path` is read at + /// construction and trimmed of trailing whitespace. Missing/empty + /// = endpoint will 503 every request with a clear reason. + pub async fn new( + registry: Registry, + writer: Arc, + legal_token_path: &std::path::Path, + ) -> Self { + let legal_token = match tokio::fs::read_to_string(legal_token_path).await { + Ok(s) => { + let trimmed = s.trim().to_string(); + if trimmed.is_empty() { + tracing::warn!( + "audit endpoint: legal token file at {} is empty — endpoint will 503", + legal_token_path.display() + ); + None + } else if trimmed.len() < 16 { + tracing::warn!( + "audit endpoint: legal token at {} is {} chars (recommend ≥32) — endpoint will 503", + legal_token_path.display(), trimmed.len() + ); + None + } else { + tracing::info!("audit endpoint: legal token loaded from {}", legal_token_path.display()); + Some(Arc::new(trimmed)) + } + } + Err(e) => { + tracing::warn!( + "audit endpoint: legal token unreadable from {} ({e}) — endpoint will 503", + legal_token_path.display() + ); + None + } + }; + Self { registry, writer, legal_token } + } +} + +pub fn router(state: AuditEndpointState) -> Router { + Router::new() + .route("/subject/{candidate_id}", get(audit_subject)) + .route("/health", get(audit_health)) + .with_state(state) +} + +async fn audit_health(State(state): State) -> impl IntoResponse { + if state.legal_token.is_some() { + (StatusCode::OK, "audit endpoint ready").into_response() + } else { + (StatusCode::SERVICE_UNAVAILABLE, "audit endpoint disabled (legal token missing)").into_response() + } +} + +#[derive(Deserialize)] +struct AuditQuery { + /// ISO-8601 start of audit window. None = unbounded back. + from: Option>, + /// ISO-8601 end of audit window. None = unbounded forward. + to: Option>, +} + +#[derive(Serialize)] +struct AuditWindowEcho { + from: Option>, + to: Option>, +} + +#[derive(Serialize)] +struct AuditLogSection { + rows_in_window: usize, + rows: Vec, + chain_root: String, + chain_verified: bool, + chain_rows_total: usize, + chain_verification_error: Option, +} + +#[derive(Serialize)] +struct AuditResponse { + schema: &'static str, + candidate_id: String, + generated_at: chrono::DateTime, + generated_by: &'static str, + request_window: AuditWindowEcho, + manifest: SubjectManifest, + audit_log: AuditLogSection, + /// Datasets the manifest says contain rows for this subject — caller + /// can issue follow-up queries against the named safe_views to get + /// the actual row projection. Keeping projection out of this endpoint + /// avoids depending on queryd from catalogd. + datasets_referenced: Vec, + safe_views_available: Vec, + completeness_attestation: String, +} + +#[derive(Serialize)] +struct DatasetRefEcho { + name: String, + key_column: String, + key_value: String, +} + +/// Constant-time string comparison to avoid timing leaks on the legal +/// token check. `subtle` would be the standard but introducing a dep +/// for one comparison is overkill — manual byte-eq with an unconditional +/// fold over both byte slices. +fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { + if a.len() != b.len() { + return false; + } + let mut diff: u8 = 0; + for (x, y) in a.iter().zip(b.iter()) { + diff |= x ^ y; + } + diff == 0 +} + +/// Extract + verify the legal-tier token from the request. Returns +/// Err((status, body)) on any failure (missing header, missing token +/// configured, mismatch). Does NOT log token values. +fn require_legal_auth( + state: &AuditEndpointState, + headers: &HeaderMap, +) -> Result<(), (StatusCode, &'static str)> { + let configured = state.legal_token.as_ref().ok_or(( + StatusCode::SERVICE_UNAVAILABLE, + "audit endpoint disabled — no legal token configured on this gateway", + ))?; + let provided = headers.get(LEGAL_TOKEN_HEADER).ok_or(( + StatusCode::UNAUTHORIZED, + "missing X-Lakehouse-Legal-Token header", + ))?; + let provided = provided.to_str().map_err(|_| ( + StatusCode::UNAUTHORIZED, + "X-Lakehouse-Legal-Token contains non-ASCII characters", + ))?; + if !constant_time_eq(provided.as_bytes(), configured.as_bytes()) { + return Err((StatusCode::UNAUTHORIZED, "X-Lakehouse-Legal-Token mismatch")); + } + Ok(()) +} + +async fn audit_subject( + State(state): State, + Path(candidate_id): Path, + Query(q): Query, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err((status, msg)) = require_legal_auth(&state, &headers) { + return (status, msg).into_response(); + } + if candidate_id.is_empty() { + return (StatusCode::BAD_REQUEST, "candidate_id is empty").into_response(); + } + + // 1. Load the subject manifest. + let manifest = match state.registry.get_subject(&candidate_id).await { + Some(m) => m, + None => return ( + StatusCode::NOT_FOUND, + format!("subject manifest not found for candidate_id={candidate_id}"), + ).into_response(), + }; + + // 2. Read audit rows in window. + let rows = match state.writer.read_rows_in_range(&candidate_id, q.from, q.to).await { + Ok(r) => r, + Err(e) => return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("audit log read failed: {e}"), + ).into_response(), + }; + + // 3. Verify chain end-to-end (full log, not just the windowed slice; + // chain integrity is global, not per-window). + let (chain_verified, chain_rows_total, chain_root, chain_err) = + match state.writer.verify_chain(&candidate_id).await { + Ok(n) => { + // Find the latest row's hmac as chain_root for echo. + let root = rows.last() + .map(|r| r.row_hmac.clone()) + .unwrap_or_else(|| "GENESIS".into()); + (true, n, root, None) + } + Err(e) => (false, 0, String::new(), Some(e)), + }; + + let datasets_referenced = manifest.datasets.iter() + .map(|d| DatasetRefEcho { + name: d.name.clone(), + key_column: d.key_column.clone(), + key_value: d.key_value.clone(), + }) + .collect(); + let safe_views_available = manifest.safe_views.clone(); + + let attestation = format!( + "All audit rows for candidate_id={} within [{}, {}] are included. \ + HMAC chain verified end-to-end: {} rows total, integrity={}.", + candidate_id, + q.from.map(|d| d.to_rfc3339()).unwrap_or_else(|| "unbounded".into()), + q.to.map(|d| d.to_rfc3339()).unwrap_or_else(|| "unbounded".into()), + chain_rows_total, + if chain_verified { "verified" } else { "BROKEN" }, + ); + + let resp = AuditResponse { + schema: RESPONSE_SCHEMA, + candidate_id: candidate_id.clone(), + generated_at: chrono::Utc::now(), + generated_by: "catalogd", + request_window: AuditWindowEcho { from: q.from, to: q.to }, + manifest, + audit_log: AuditLogSection { + rows_in_window: rows.len(), + rows, + chain_root, + chain_verified, + chain_rows_total, + chain_verification_error: chain_err, + }, + datasets_referenced, + safe_views_available, + completeness_attestation: attestation, + }; + + // Log the legal-tier read so it's discoverable in operations review. + // Token value is NEVER logged. Only the fact of the read. + tracing::info!( + candidate_id = %candidate_id, + rows_in_window = resp.audit_log.rows_in_window, + chain_verified = resp.audit_log.chain_verified, + "legal-tier audit response served" + ); + + (StatusCode::OK, Json(resp)).into_response() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::subject_audit::SubjectAuditWriter; + use object_store::memory::InMemory; + use shared::types::{ + AuditAccessor, BiometricConsent, BiometricConsentStatus, ConsentStatus, + GeneralPiiConsent, SubjectAuditRow, SubjectConsent, SubjectRetention, + SubjectStatus, SubjectVertical, + }; + + fn fixture_manifest(candidate_id: &str) -> SubjectManifest { + SubjectManifest { + schema: "subject_manifest.v1".into(), + candidate_id: candidate_id.into(), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + status: SubjectStatus::Active, + vertical: SubjectVertical::Unknown, + consent: SubjectConsent { + general_pii: GeneralPiiConsent { + status: ConsentStatus::PendingBackfillReview, + version: String::new(), + given_at: None, + withdrawn_at: None, + }, + biometric: BiometricConsent { + status: BiometricConsentStatus::NeverCollected, + retention_until: None, + }, + }, + retention: SubjectRetention { + general_pii_until: chrono::Utc::now() + chrono::Duration::days(365 * 4), + policy: "test".into(), + }, + datasets: vec![], + safe_views: vec![], + audit_log_path: String::new(), + audit_log_chain_root: String::new(), + } + } + + fn fixture_audit_row(candidate_id: &str) -> SubjectAuditRow { + SubjectAuditRow { + schema: "subject_audit.v1".into(), + ts: chrono::Utc::now(), + candidate_id: candidate_id.into(), + accessor: AuditAccessor { + kind: "test".into(), + daemon: "test".into(), + purpose: "test".into(), + trace_id: String::new(), + }, + fields_accessed: vec!["name".into()], + result: "success".into(), + prev_chain_hash: String::new(), + row_hmac: String::new(), + } + } + + async fn fixture_state_with_token(token: &str) -> AuditEndpointState { + let store: Arc = Arc::new(InMemory::new()); + let registry = Registry::new(store.clone()); + let writer = Arc::new(SubjectAuditWriter::with_inline_key( + store, (0u8..32).collect(), + )); + AuditEndpointState { + registry, + writer, + legal_token: Some(Arc::new(token.into())), + } + } + + #[test] + fn constant_time_eq_basic() { + assert!(constant_time_eq(b"abc", b"abc")); + assert!(!constant_time_eq(b"abc", b"abd")); + assert!(!constant_time_eq(b"abc", b"abcd")); + assert!(constant_time_eq(b"", b"")); + } + + #[tokio::test] + async fn missing_legal_token_returns_503() { + let state = AuditEndpointState { + registry: Registry::new(Arc::new(InMemory::new())), + writer: Arc::new(SubjectAuditWriter::with_inline_key( + Arc::new(InMemory::new()), (0u8..32).collect(), + )), + legal_token: None, + }; + let headers = HeaderMap::new(); + let r = require_legal_auth(&state, &headers); + assert!(matches!(r, Err((StatusCode::SERVICE_UNAVAILABLE, _)))); + } + + #[tokio::test] + async fn missing_header_returns_401() { + let state = fixture_state_with_token("test-token-must-be-long-enough").await; + let headers = HeaderMap::new(); + let r = require_legal_auth(&state, &headers); + assert!(matches!(r, Err((StatusCode::UNAUTHORIZED, _)))); + } + + #[tokio::test] + async fn wrong_token_returns_401() { + let state = fixture_state_with_token("correct-token-must-be-long-enough").await; + let mut headers = HeaderMap::new(); + headers.insert(LEGAL_TOKEN_HEADER, "wrong".parse().unwrap()); + let r = require_legal_auth(&state, &headers); + assert!(matches!(r, Err((StatusCode::UNAUTHORIZED, _)))); + } + + #[tokio::test] + async fn correct_token_passes_auth() { + let state = fixture_state_with_token("correct-token-must-be-long-enough").await; + let mut headers = HeaderMap::new(); + headers.insert(LEGAL_TOKEN_HEADER, "correct-token-must-be-long-enough".parse().unwrap()); + let r = require_legal_auth(&state, &headers); + assert!(r.is_ok()); + } + + #[tokio::test] + async fn audit_response_assembly_full_path() { + let state = fixture_state_with_token("test-token-must-be-long-enough").await; + // Plant a manifest + 3 audit rows. + state.registry.put_subject(fixture_manifest("CAND-AUDIT-1")).await.unwrap(); + for _ in 0..3 { + state.writer.append(fixture_audit_row("CAND-AUDIT-1")).await.unwrap(); + } + // Read rows in unbounded window. + let rows = state.writer + .read_rows_in_range("CAND-AUDIT-1", None, None) + .await + .unwrap(); + assert_eq!(rows.len(), 3); + // Verify chain integrity. + let n = state.writer.verify_chain("CAND-AUDIT-1").await.unwrap(); + assert_eq!(n, 3); + } + + #[tokio::test] + async fn audit_response_window_filters_rows() { + let state = fixture_state_with_token("test-token-must-be-long-enough").await; + state.registry.put_subject(fixture_manifest("CAND-WIN-1")).await.unwrap(); + // Append 5 rows. + let mut row_times = Vec::new(); + for _ in 0..5 { + let row = fixture_audit_row("CAND-WIN-1"); + row_times.push(row.ts); + state.writer.append(row).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + } + // Window covering only the middle 3 rows. + let from = row_times[1]; + let to = row_times[3]; + let rows = state.writer + .read_rows_in_range("CAND-WIN-1", Some(from), Some(to)) + .await + .unwrap(); + assert!(rows.len() >= 3, "expected ≥3 rows in window; got {}", rows.len()); + assert!(rows.len() <= 3, "expected ≤3 rows in window; got {}", rows.len()); + } + + #[tokio::test] + async fn empty_token_file_results_in_disabled_endpoint() { + let tmp = std::env::temp_dir().join(format!("legal_empty_{}.tok", std::process::id())); + tokio::fs::write(&tmp, "").await.unwrap(); + let store: Arc = Arc::new(InMemory::new()); + let registry = Registry::new(store.clone()); + let writer = Arc::new(SubjectAuditWriter::with_inline_key(store, (0u8..32).collect())); + let state = AuditEndpointState::new(registry, writer, &tmp).await; + assert!(state.legal_token.is_none()); + let _ = tokio::fs::remove_file(&tmp).await; + } + + #[tokio::test] + async fn short_token_file_rejected_at_load() { + let tmp = std::env::temp_dir().join(format!("legal_short_{}.tok", std::process::id())); + tokio::fs::write(&tmp, "tooshort").await.unwrap(); + let store: Arc = Arc::new(InMemory::new()); + let registry = Registry::new(store.clone()); + let writer = Arc::new(SubjectAuditWriter::with_inline_key(store, (0u8..32).collect())); + let state = AuditEndpointState::new(registry, writer, &tmp).await; + assert!(state.legal_token.is_none()); + let _ = tokio::fs::remove_file(&tmp).await; + } +} diff --git a/crates/catalogd/src/lib.rs b/crates/catalogd/src/lib.rs index 1d02429..e78a5f3 100644 --- a/crates/catalogd/src/lib.rs +++ b/crates/catalogd/src/lib.rs @@ -3,3 +3,4 @@ pub mod service; pub mod grpc; pub mod tombstones; pub mod subject_audit; +pub mod audit_endpoint; diff --git a/crates/catalogd/src/subject_audit.rs b/crates/catalogd/src/subject_audit.rs index 0a58ea5..a106c14 100644 --- a/crates/catalogd/src/subject_audit.rs +++ b/crates/catalogd/src/subject_audit.rs @@ -292,12 +292,59 @@ impl SubjectAuditWriter { ops::put(&self.store, &key, Bytes::from(buf)).await } + /// Read all audit rows for a subject, optionally filtered by time + /// window (inclusive bounds). Returns rows in append order. Used by + /// the /audit/subject/{id} endpoint to assemble the legal-tier + /// response. Window bounds are optional — `from=None`+`to=None` + /// returns the full log. + pub async fn read_rows_in_range( + &self, + candidate_id: &str, + from: Option>, + to: Option>, + ) -> Result, String> { + let key = Self::audit_key(candidate_id); + let bytes = match ops::get(&self.store, &key).await { + Ok(b) => b, + Err(_) => return Ok(Vec::new()), // no audit log = empty result + }; + let text = std::str::from_utf8(&bytes) + .map_err(|e| format!("audit log not utf-8: {e}"))?; + let mut out = Vec::new(); + for line in text.lines() { + if line.trim().is_empty() { + continue; + } + let row: SubjectAuditRow = match serde_json::from_str(line) { + Ok(r) => r, + Err(_) => continue, // skip unparseable lines (defensive) + }; + if let Some(from) = from { + if row.ts < from { continue; } + } + if let Some(to) = to { + if row.ts > to { continue; } + } + out.push(row); + } + Ok(out) + } + /// Verify the full HMAC chain for a subject. Returns Ok(rows_verified) /// or Err with the first chain break encountered. + /// + /// Special case: if no audit log exists (the subject has had no PII + /// access yet), returns Ok(0) — an empty log is trivially valid. + /// Callers that want to detect "log was deleted" should compare + /// against SubjectManifest.audit_log_chain_root: if the manifest's + /// chain root is non-empty/non-GENESIS but verify_chain returns 0, + /// the file was tampered. pub async fn verify_chain(&self, candidate_id: &str) -> Result { let key = Self::audit_key(candidate_id); - let bytes = ops::get(&self.store, &key).await - .map_err(|e| format!("read audit log for {candidate_id}: {e}"))?; + let bytes = match ops::get(&self.store, &key).await { + Ok(b) => b, + Err(_) => return Ok(0), // no log = 0 rows = trivially valid + }; let text = std::str::from_utf8(&bytes) .map_err(|e| format!("audit log not utf-8: {e}"))?; let mut prev = GENESIS_HASH.to_string(); diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 698d987..9b4a8e9 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -420,6 +420,25 @@ async fn main() { subject_audit: subject_audit_writer.clone(), })); + // Step 6 of SUBJECT_MANIFESTS_ON_CATALOGD spec — /audit/subject/{id} + // legal-tier endpoint. Only mounted when BOTH the audit writer AND + // the legal token file are present. Disabled-by-default keeps the + // audit surface from accidentally serving without proper credentials + // in dev / bring-up environments. + if let Some(writer) = subject_audit_writer.clone() { + let legal_token_path = std::env::var("LH_LEGAL_AUDIT_TOKEN_FILE") + .unwrap_or_else(|_| "/etc/lakehouse/legal_audit.token".into()); + let audit_state = catalogd::audit_endpoint::AuditEndpointState::new( + registry.clone(), + writer, + std::path::Path::new(&legal_token_path), + ).await; + app = app.nest("/audit", catalogd::audit_endpoint::router(audit_state)); + tracing::info!("audit endpoint mounted at /audit (legal token: {})", legal_token_path); + } else { + tracing::warn!("/audit endpoint NOT mounted — subject_audit writer is None (no signing key)"); + } + // Auth middleware (if enabled) — P5-001 fix 2026-04-23: // previously only inserted the ApiKey as an extension and never layered // the middleware, so auth.enabled=true enforced nothing. Now wraps the