catalogd + gateway: Step 6 — /audit/subject/{id} legal-tier HTTP endpoint
Implementation of docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 6
+ §4 (response shape) + §6 (auth model). The defense-against-EEOC-
discovery surface is live: legal counsel hits one URL with one token,
gets back a signed-by-HMAC-chain audit response naming every PII access
for a subject in a time window.
New module: crates/catalogd/src/audit_endpoint.rs (~340 LOC)
- AuditEndpointState { registry, writer, legal_token }
- router() exposes:
GET /subject/{candidate_id}?from=ISO&to=ISO (full audit response)
GET /health (liveness + token check)
- require_legal_auth() — constant-time-eq compare against the
X-Lakehouse-Legal-Token header. Avoids timing leaks on the token
check without pulling in `subtle` for one comparison.
- Token loaded from /etc/lakehouse/legal_audit.token (env-overridable
via LH_LEGAL_AUDIT_TOKEN_FILE). Empty file or <16 chars = endpoint
serves 503 with a clear reason. Token value NEVER logged.
- Response schema: subject_audit_response.v1 with manifest +
audit_log (rows + chain verification) + datasets_referenced +
safe_views_available + completeness_attestation.
New helper on SubjectAuditWriter:
- read_rows_in_range(candidate_id, from, to) — returns rows in window,
used by the endpoint to assemble the response without re-reading
the entire chain.
- verify_chain() now returns Ok(0) when the audit log file doesn't
exist (empty = trivially valid). Prevents legitimate "no PII access
yet for this subject" from showing as integrity=BROKEN in the
audit response. Caller can detect "log was deleted" via comparison
to SubjectManifest.audit_log_chain_root (when that mirror lands).
main.rs:
- Audit endpoint mounted at /audit ONLY when both subject_audit
writer AND legal token are present. Disabled-by-default keeps the
surface from accidentally serving in dev/bring-up environments
without proper credentials.
Tests (9/9 passing):
- constant_time_eq (correctness on equal/diff/empty/length-mismatch)
- missing_legal_token_returns_503
- missing_header_returns_401
- wrong_token_returns_401
- correct_token_passes_auth
- audit_response_assembly_full_path (manifest + 3 rows + chain verify)
- audit_response_window_filters_rows (time-bounded window)
- empty_token_file_results_in_disabled_endpoint
- short_token_file_rejected_at_load (<16 char min)
LIVE end-to-end verification:
1. Plant signing key + legal token in /tmp/lakehouse_audit/
2. Restart gateway with LH_SUBJECT_AUDIT_KEY + LH_LEGAL_AUDIT_TOKEN_FILE
pointing at the test files
3. /audit/health → 200 "audit endpoint ready"
4. /audit/subject/WORKER-1 (no token) → 401 "missing X-Lakehouse-Legal-Token"
5. /audit/subject/WORKER-1 (wrong token) → 401 "X-Lakehouse-Legal-Token mismatch"
6. /audit/subject/WORKER-1 (correct token) → 200 + full manifest + 0 rows
+ chain_verified=true (empty log path)
7. POST /v1/validate with candidate_id=WORKER-1 → triggers WorkerLookup.find()
via the AuditingWorkerLookup wrapper from Step 5
8. data/_catalog/subjects/WORKER-1.audit.jsonl now exists with 1 row
(accessor.purpose=validator_worker_lookup, result=not_found,
prev_chain_hash=GENESIS, valid HMAC)
9. /audit/subject/WORKER-1 (correct token) → 200 + manifest + 1 row +
chain_verified=true + chain_rows_total=1 + completeness attestation
The full audit-trail loop (PII access → audit row → chain → audit response)
works end-to-end on the live gateway.
NOT in this commit (future steps):
- Step 7: Daily retention sweep
- Step 8: Cross-runtime parity (Go side reads the same shapes)
- Mirror chain root to SubjectManifest.audit_log_chain_root after
each append (so tampering detection can use the manifest's
cached root as ground truth)
- Live row projection from datasets (currently caller follows up
via /query/sql against the safe_views named in the response)
- Ed25519 signature on the response (chain verification IS the v1
attestation; signing is future hardening per spec §10)
cargo build --release clean. cargo test -p catalogd audit_endpoint
9/9 PASS. Live verification successful.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
cd8c59a53d
commit
15cfd76c04
481
crates/catalogd/src/audit_endpoint.rs
Normal file
481
crates/catalogd/src/audit_endpoint.rs
Normal file
@ -0,0 +1,481 @@
|
||||
//! `/audit/subject/{id}` HTTP endpoint — legal-tier subject audit response.
|
||||
//!
|
||||
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §4 + §6.
|
||||
//!
|
||||
//! Responds to: `GET /subject/{candidate_id}?from=ISO&to=ISO`
|
||||
//! Auth: requires `X-Lakehouse-Legal-Token` header matching the value in
|
||||
//! the legal-token file (see LegalAuditAuth). Constant-time compare.
|
||||
//!
|
||||
//! Response shape (subject_audit_response.v1):
|
||||
//! - manifest: full SubjectManifest from catalogd
|
||||
//! - audit_log.rows: SubjectAuditRow entries within [from, to]
|
||||
//! - audit_log.chain_verified: bool (HMAC chain verifies end-to-end)
|
||||
//! - completeness_attestation: text the lawyer can quote
|
||||
//!
|
||||
//! V1 deliberately does NOT include:
|
||||
//! - Ed25519 signature on the response — chain verification IS the
|
||||
//! v1 attestation; signing is future hardening (spec §10 v2 reserve)
|
||||
//! - Live row projection from the underlying datasets — caller can
|
||||
//! fetch via /query/sql against the safe_views named in the manifest;
|
||||
//! keeping projection out of this endpoint avoids depending on queryd
|
||||
//! - External anchor / S3 Object Lock — local HMAC chain is the v1
|
||||
//! integrity surface
|
||||
//!
|
||||
//! These are documented as future hardening, not silent gaps.
|
||||
|
||||
use crate::registry::Registry;
|
||||
use crate::subject_audit::SubjectAuditWriter;
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
http::{HeaderMap, StatusCode},
|
||||
response::IntoResponse,
|
||||
routing::get,
|
||||
Json, Router,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shared::types::{SubjectAuditRow, SubjectManifest};
|
||||
use std::sync::Arc;
|
||||
|
||||
const LEGAL_TOKEN_HEADER: &str = "x-lakehouse-legal-token";
|
||||
const RESPONSE_SCHEMA: &str = "subject_audit_response.v1";
|
||||
|
||||
/// State for the audit endpoint router. Held outside Registry because
|
||||
/// the audit writer + legal token aren't part of catalogd's data model
|
||||
/// — they're sidecar concerns specific to this endpoint.
|
||||
#[derive(Clone)]
|
||||
pub struct AuditEndpointState {
|
||||
pub registry: Registry,
|
||||
pub writer: Arc<SubjectAuditWriter>,
|
||||
/// Legal-tier token. Constant-time compared against the header on
|
||||
/// every request. None = endpoint returns 503 (audit unavailable).
|
||||
pub legal_token: Option<Arc<String>>,
|
||||
}
|
||||
|
||||
impl AuditEndpointState {
|
||||
/// Construct from existing pieces. `legal_token_path` is read at
|
||||
/// construction and trimmed of trailing whitespace. Missing/empty
|
||||
/// = endpoint will 503 every request with a clear reason.
|
||||
pub async fn new(
|
||||
registry: Registry,
|
||||
writer: Arc<SubjectAuditWriter>,
|
||||
legal_token_path: &std::path::Path,
|
||||
) -> Self {
|
||||
let legal_token = match tokio::fs::read_to_string(legal_token_path).await {
|
||||
Ok(s) => {
|
||||
let trimmed = s.trim().to_string();
|
||||
if trimmed.is_empty() {
|
||||
tracing::warn!(
|
||||
"audit endpoint: legal token file at {} is empty — endpoint will 503",
|
||||
legal_token_path.display()
|
||||
);
|
||||
None
|
||||
} else if trimmed.len() < 16 {
|
||||
tracing::warn!(
|
||||
"audit endpoint: legal token at {} is {} chars (recommend ≥32) — endpoint will 503",
|
||||
legal_token_path.display(), trimmed.len()
|
||||
);
|
||||
None
|
||||
} else {
|
||||
tracing::info!("audit endpoint: legal token loaded from {}", legal_token_path.display());
|
||||
Some(Arc::new(trimmed))
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"audit endpoint: legal token unreadable from {} ({e}) — endpoint will 503",
|
||||
legal_token_path.display()
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
Self { registry, writer, legal_token }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn router(state: AuditEndpointState) -> Router {
|
||||
Router::new()
|
||||
.route("/subject/{candidate_id}", get(audit_subject))
|
||||
.route("/health", get(audit_health))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
async fn audit_health(State(state): State<AuditEndpointState>) -> impl IntoResponse {
|
||||
if state.legal_token.is_some() {
|
||||
(StatusCode::OK, "audit endpoint ready").into_response()
|
||||
} else {
|
||||
(StatusCode::SERVICE_UNAVAILABLE, "audit endpoint disabled (legal token missing)").into_response()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct AuditQuery {
|
||||
/// ISO-8601 start of audit window. None = unbounded back.
|
||||
from: Option<chrono::DateTime<chrono::Utc>>,
|
||||
/// ISO-8601 end of audit window. None = unbounded forward.
|
||||
to: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct AuditWindowEcho {
|
||||
from: Option<chrono::DateTime<chrono::Utc>>,
|
||||
to: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct AuditLogSection {
|
||||
rows_in_window: usize,
|
||||
rows: Vec<SubjectAuditRow>,
|
||||
chain_root: String,
|
||||
chain_verified: bool,
|
||||
chain_rows_total: usize,
|
||||
chain_verification_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct AuditResponse {
|
||||
schema: &'static str,
|
||||
candidate_id: String,
|
||||
generated_at: chrono::DateTime<chrono::Utc>,
|
||||
generated_by: &'static str,
|
||||
request_window: AuditWindowEcho,
|
||||
manifest: SubjectManifest,
|
||||
audit_log: AuditLogSection,
|
||||
/// Datasets the manifest says contain rows for this subject — caller
|
||||
/// can issue follow-up queries against the named safe_views to get
|
||||
/// the actual row projection. Keeping projection out of this endpoint
|
||||
/// avoids depending on queryd from catalogd.
|
||||
datasets_referenced: Vec<DatasetRefEcho>,
|
||||
safe_views_available: Vec<String>,
|
||||
completeness_attestation: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct DatasetRefEcho {
|
||||
name: String,
|
||||
key_column: String,
|
||||
key_value: String,
|
||||
}
|
||||
|
||||
/// Constant-time string comparison to avoid timing leaks on the legal
|
||||
/// token check. `subtle` would be the standard but introducing a dep
|
||||
/// for one comparison is overkill — manual byte-eq with an unconditional
|
||||
/// fold over both byte slices.
|
||||
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
|
||||
if a.len() != b.len() {
|
||||
return false;
|
||||
}
|
||||
let mut diff: u8 = 0;
|
||||
for (x, y) in a.iter().zip(b.iter()) {
|
||||
diff |= x ^ y;
|
||||
}
|
||||
diff == 0
|
||||
}
|
||||
|
||||
/// Extract + verify the legal-tier token from the request. Returns
|
||||
/// Err((status, body)) on any failure (missing header, missing token
|
||||
/// configured, mismatch). Does NOT log token values.
|
||||
fn require_legal_auth(
|
||||
state: &AuditEndpointState,
|
||||
headers: &HeaderMap,
|
||||
) -> Result<(), (StatusCode, &'static str)> {
|
||||
let configured = state.legal_token.as_ref().ok_or((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"audit endpoint disabled — no legal token configured on this gateway",
|
||||
))?;
|
||||
let provided = headers.get(LEGAL_TOKEN_HEADER).ok_or((
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"missing X-Lakehouse-Legal-Token header",
|
||||
))?;
|
||||
let provided = provided.to_str().map_err(|_| (
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"X-Lakehouse-Legal-Token contains non-ASCII characters",
|
||||
))?;
|
||||
if !constant_time_eq(provided.as_bytes(), configured.as_bytes()) {
|
||||
return Err((StatusCode::UNAUTHORIZED, "X-Lakehouse-Legal-Token mismatch"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn audit_subject(
|
||||
State(state): State<AuditEndpointState>,
|
||||
Path(candidate_id): Path<String>,
|
||||
Query(q): Query<AuditQuery>,
|
||||
headers: HeaderMap,
|
||||
) -> impl IntoResponse {
|
||||
if let Err((status, msg)) = require_legal_auth(&state, &headers) {
|
||||
return (status, msg).into_response();
|
||||
}
|
||||
if candidate_id.is_empty() {
|
||||
return (StatusCode::BAD_REQUEST, "candidate_id is empty").into_response();
|
||||
}
|
||||
|
||||
// 1. Load the subject manifest.
|
||||
let manifest = match state.registry.get_subject(&candidate_id).await {
|
||||
Some(m) => m,
|
||||
None => return (
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("subject manifest not found for candidate_id={candidate_id}"),
|
||||
).into_response(),
|
||||
};
|
||||
|
||||
// 2. Read audit rows in window.
|
||||
let rows = match state.writer.read_rows_in_range(&candidate_id, q.from, q.to).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("audit log read failed: {e}"),
|
||||
).into_response(),
|
||||
};
|
||||
|
||||
// 3. Verify chain end-to-end (full log, not just the windowed slice;
|
||||
// chain integrity is global, not per-window).
|
||||
let (chain_verified, chain_rows_total, chain_root, chain_err) =
|
||||
match state.writer.verify_chain(&candidate_id).await {
|
||||
Ok(n) => {
|
||||
// Find the latest row's hmac as chain_root for echo.
|
||||
let root = rows.last()
|
||||
.map(|r| r.row_hmac.clone())
|
||||
.unwrap_or_else(|| "GENESIS".into());
|
||||
(true, n, root, None)
|
||||
}
|
||||
Err(e) => (false, 0, String::new(), Some(e)),
|
||||
};
|
||||
|
||||
let datasets_referenced = manifest.datasets.iter()
|
||||
.map(|d| DatasetRefEcho {
|
||||
name: d.name.clone(),
|
||||
key_column: d.key_column.clone(),
|
||||
key_value: d.key_value.clone(),
|
||||
})
|
||||
.collect();
|
||||
let safe_views_available = manifest.safe_views.clone();
|
||||
|
||||
let attestation = format!(
|
||||
"All audit rows for candidate_id={} within [{}, {}] are included. \
|
||||
HMAC chain verified end-to-end: {} rows total, integrity={}.",
|
||||
candidate_id,
|
||||
q.from.map(|d| d.to_rfc3339()).unwrap_or_else(|| "unbounded".into()),
|
||||
q.to.map(|d| d.to_rfc3339()).unwrap_or_else(|| "unbounded".into()),
|
||||
chain_rows_total,
|
||||
if chain_verified { "verified" } else { "BROKEN" },
|
||||
);
|
||||
|
||||
let resp = AuditResponse {
|
||||
schema: RESPONSE_SCHEMA,
|
||||
candidate_id: candidate_id.clone(),
|
||||
generated_at: chrono::Utc::now(),
|
||||
generated_by: "catalogd",
|
||||
request_window: AuditWindowEcho { from: q.from, to: q.to },
|
||||
manifest,
|
||||
audit_log: AuditLogSection {
|
||||
rows_in_window: rows.len(),
|
||||
rows,
|
||||
chain_root,
|
||||
chain_verified,
|
||||
chain_rows_total,
|
||||
chain_verification_error: chain_err,
|
||||
},
|
||||
datasets_referenced,
|
||||
safe_views_available,
|
||||
completeness_attestation: attestation,
|
||||
};
|
||||
|
||||
// Log the legal-tier read so it's discoverable in operations review.
|
||||
// Token value is NEVER logged. Only the fact of the read.
|
||||
tracing::info!(
|
||||
candidate_id = %candidate_id,
|
||||
rows_in_window = resp.audit_log.rows_in_window,
|
||||
chain_verified = resp.audit_log.chain_verified,
|
||||
"legal-tier audit response served"
|
||||
);
|
||||
|
||||
(StatusCode::OK, Json(resp)).into_response()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::subject_audit::SubjectAuditWriter;
|
||||
use object_store::memory::InMemory;
|
||||
use shared::types::{
|
||||
AuditAccessor, BiometricConsent, BiometricConsentStatus, ConsentStatus,
|
||||
GeneralPiiConsent, SubjectAuditRow, SubjectConsent, SubjectRetention,
|
||||
SubjectStatus, SubjectVertical,
|
||||
};
|
||||
|
||||
fn fixture_manifest(candidate_id: &str) -> SubjectManifest {
|
||||
SubjectManifest {
|
||||
schema: "subject_manifest.v1".into(),
|
||||
candidate_id: candidate_id.into(),
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
status: SubjectStatus::Active,
|
||||
vertical: SubjectVertical::Unknown,
|
||||
consent: SubjectConsent {
|
||||
general_pii: GeneralPiiConsent {
|
||||
status: ConsentStatus::PendingBackfillReview,
|
||||
version: String::new(),
|
||||
given_at: None,
|
||||
withdrawn_at: None,
|
||||
},
|
||||
biometric: BiometricConsent {
|
||||
status: BiometricConsentStatus::NeverCollected,
|
||||
retention_until: None,
|
||||
},
|
||||
},
|
||||
retention: SubjectRetention {
|
||||
general_pii_until: chrono::Utc::now() + chrono::Duration::days(365 * 4),
|
||||
policy: "test".into(),
|
||||
},
|
||||
datasets: vec![],
|
||||
safe_views: vec![],
|
||||
audit_log_path: String::new(),
|
||||
audit_log_chain_root: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn fixture_audit_row(candidate_id: &str) -> SubjectAuditRow {
|
||||
SubjectAuditRow {
|
||||
schema: "subject_audit.v1".into(),
|
||||
ts: chrono::Utc::now(),
|
||||
candidate_id: candidate_id.into(),
|
||||
accessor: AuditAccessor {
|
||||
kind: "test".into(),
|
||||
daemon: "test".into(),
|
||||
purpose: "test".into(),
|
||||
trace_id: String::new(),
|
||||
},
|
||||
fields_accessed: vec!["name".into()],
|
||||
result: "success".into(),
|
||||
prev_chain_hash: String::new(),
|
||||
row_hmac: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fixture_state_with_token(token: &str) -> AuditEndpointState {
|
||||
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
|
||||
let registry = Registry::new(store.clone());
|
||||
let writer = Arc::new(SubjectAuditWriter::with_inline_key(
|
||||
store, (0u8..32).collect(),
|
||||
));
|
||||
AuditEndpointState {
|
||||
registry,
|
||||
writer,
|
||||
legal_token: Some(Arc::new(token.into())),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn constant_time_eq_basic() {
|
||||
assert!(constant_time_eq(b"abc", b"abc"));
|
||||
assert!(!constant_time_eq(b"abc", b"abd"));
|
||||
assert!(!constant_time_eq(b"abc", b"abcd"));
|
||||
assert!(constant_time_eq(b"", b""));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_legal_token_returns_503() {
|
||||
let state = AuditEndpointState {
|
||||
registry: Registry::new(Arc::new(InMemory::new())),
|
||||
writer: Arc::new(SubjectAuditWriter::with_inline_key(
|
||||
Arc::new(InMemory::new()), (0u8..32).collect(),
|
||||
)),
|
||||
legal_token: None,
|
||||
};
|
||||
let headers = HeaderMap::new();
|
||||
let r = require_legal_auth(&state, &headers);
|
||||
assert!(matches!(r, Err((StatusCode::SERVICE_UNAVAILABLE, _))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_header_returns_401() {
|
||||
let state = fixture_state_with_token("test-token-must-be-long-enough").await;
|
||||
let headers = HeaderMap::new();
|
||||
let r = require_legal_auth(&state, &headers);
|
||||
assert!(matches!(r, Err((StatusCode::UNAUTHORIZED, _))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wrong_token_returns_401() {
|
||||
let state = fixture_state_with_token("correct-token-must-be-long-enough").await;
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(LEGAL_TOKEN_HEADER, "wrong".parse().unwrap());
|
||||
let r = require_legal_auth(&state, &headers);
|
||||
assert!(matches!(r, Err((StatusCode::UNAUTHORIZED, _))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn correct_token_passes_auth() {
|
||||
let state = fixture_state_with_token("correct-token-must-be-long-enough").await;
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(LEGAL_TOKEN_HEADER, "correct-token-must-be-long-enough".parse().unwrap());
|
||||
let r = require_legal_auth(&state, &headers);
|
||||
assert!(r.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn audit_response_assembly_full_path() {
|
||||
let state = fixture_state_with_token("test-token-must-be-long-enough").await;
|
||||
// Plant a manifest + 3 audit rows.
|
||||
state.registry.put_subject(fixture_manifest("CAND-AUDIT-1")).await.unwrap();
|
||||
for _ in 0..3 {
|
||||
state.writer.append(fixture_audit_row("CAND-AUDIT-1")).await.unwrap();
|
||||
}
|
||||
// Read rows in unbounded window.
|
||||
let rows = state.writer
|
||||
.read_rows_in_range("CAND-AUDIT-1", None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows.len(), 3);
|
||||
// Verify chain integrity.
|
||||
let n = state.writer.verify_chain("CAND-AUDIT-1").await.unwrap();
|
||||
assert_eq!(n, 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn audit_response_window_filters_rows() {
|
||||
let state = fixture_state_with_token("test-token-must-be-long-enough").await;
|
||||
state.registry.put_subject(fixture_manifest("CAND-WIN-1")).await.unwrap();
|
||||
// Append 5 rows.
|
||||
let mut row_times = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let row = fixture_audit_row("CAND-WIN-1");
|
||||
row_times.push(row.ts);
|
||||
state.writer.append(row).await.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
|
||||
}
|
||||
// Window covering only the middle 3 rows.
|
||||
let from = row_times[1];
|
||||
let to = row_times[3];
|
||||
let rows = state.writer
|
||||
.read_rows_in_range("CAND-WIN-1", Some(from), Some(to))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(rows.len() >= 3, "expected ≥3 rows in window; got {}", rows.len());
|
||||
assert!(rows.len() <= 3, "expected ≤3 rows in window; got {}", rows.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_token_file_results_in_disabled_endpoint() {
|
||||
let tmp = std::env::temp_dir().join(format!("legal_empty_{}.tok", std::process::id()));
|
||||
tokio::fs::write(&tmp, "").await.unwrap();
|
||||
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
|
||||
let registry = Registry::new(store.clone());
|
||||
let writer = Arc::new(SubjectAuditWriter::with_inline_key(store, (0u8..32).collect()));
|
||||
let state = AuditEndpointState::new(registry, writer, &tmp).await;
|
||||
assert!(state.legal_token.is_none());
|
||||
let _ = tokio::fs::remove_file(&tmp).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn short_token_file_rejected_at_load() {
|
||||
let tmp = std::env::temp_dir().join(format!("legal_short_{}.tok", std::process::id()));
|
||||
tokio::fs::write(&tmp, "tooshort").await.unwrap();
|
||||
let store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
|
||||
let registry = Registry::new(store.clone());
|
||||
let writer = Arc::new(SubjectAuditWriter::with_inline_key(store, (0u8..32).collect()));
|
||||
let state = AuditEndpointState::new(registry, writer, &tmp).await;
|
||||
assert!(state.legal_token.is_none());
|
||||
let _ = tokio::fs::remove_file(&tmp).await;
|
||||
}
|
||||
}
|
||||
@ -3,3 +3,4 @@ pub mod service;
|
||||
pub mod grpc;
|
||||
pub mod tombstones;
|
||||
pub mod subject_audit;
|
||||
pub mod audit_endpoint;
|
||||
|
||||
@ -292,12 +292,59 @@ impl SubjectAuditWriter {
|
||||
ops::put(&self.store, &key, Bytes::from(buf)).await
|
||||
}
|
||||
|
||||
/// Read all audit rows for a subject, optionally filtered by time
|
||||
/// window (inclusive bounds). Returns rows in append order. Used by
|
||||
/// the /audit/subject/{id} endpoint to assemble the legal-tier
|
||||
/// response. Window bounds are optional — `from=None`+`to=None`
|
||||
/// returns the full log.
|
||||
pub async fn read_rows_in_range(
|
||||
&self,
|
||||
candidate_id: &str,
|
||||
from: Option<chrono::DateTime<chrono::Utc>>,
|
||||
to: Option<chrono::DateTime<chrono::Utc>>,
|
||||
) -> Result<Vec<SubjectAuditRow>, String> {
|
||||
let key = Self::audit_key(candidate_id);
|
||||
let bytes = match ops::get(&self.store, &key).await {
|
||||
Ok(b) => b,
|
||||
Err(_) => return Ok(Vec::new()), // no audit log = empty result
|
||||
};
|
||||
let text = std::str::from_utf8(&bytes)
|
||||
.map_err(|e| format!("audit log not utf-8: {e}"))?;
|
||||
let mut out = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let row: SubjectAuditRow = match serde_json::from_str(line) {
|
||||
Ok(r) => r,
|
||||
Err(_) => continue, // skip unparseable lines (defensive)
|
||||
};
|
||||
if let Some(from) = from {
|
||||
if row.ts < from { continue; }
|
||||
}
|
||||
if let Some(to) = to {
|
||||
if row.ts > to { continue; }
|
||||
}
|
||||
out.push(row);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Verify the full HMAC chain for a subject. Returns Ok(rows_verified)
|
||||
/// or Err with the first chain break encountered.
|
||||
///
|
||||
/// Special case: if no audit log exists (the subject has had no PII
|
||||
/// access yet), returns Ok(0) — an empty log is trivially valid.
|
||||
/// Callers that want to detect "log was deleted" should compare
|
||||
/// against SubjectManifest.audit_log_chain_root: if the manifest's
|
||||
/// chain root is non-empty/non-GENESIS but verify_chain returns 0,
|
||||
/// the file was tampered.
|
||||
pub async fn verify_chain(&self, candidate_id: &str) -> Result<usize, String> {
|
||||
let key = Self::audit_key(candidate_id);
|
||||
let bytes = ops::get(&self.store, &key).await
|
||||
.map_err(|e| format!("read audit log for {candidate_id}: {e}"))?;
|
||||
let bytes = match ops::get(&self.store, &key).await {
|
||||
Ok(b) => b,
|
||||
Err(_) => return Ok(0), // no log = 0 rows = trivially valid
|
||||
};
|
||||
let text = std::str::from_utf8(&bytes)
|
||||
.map_err(|e| format!("audit log not utf-8: {e}"))?;
|
||||
let mut prev = GENESIS_HASH.to_string();
|
||||
|
||||
@ -420,6 +420,25 @@ async fn main() {
|
||||
subject_audit: subject_audit_writer.clone(),
|
||||
}));
|
||||
|
||||
// Step 6 of SUBJECT_MANIFESTS_ON_CATALOGD spec — /audit/subject/{id}
|
||||
// legal-tier endpoint. Only mounted when BOTH the audit writer AND
|
||||
// the legal token file are present. Disabled-by-default keeps the
|
||||
// audit surface from accidentally serving without proper credentials
|
||||
// in dev / bring-up environments.
|
||||
if let Some(writer) = subject_audit_writer.clone() {
|
||||
let legal_token_path = std::env::var("LH_LEGAL_AUDIT_TOKEN_FILE")
|
||||
.unwrap_or_else(|_| "/etc/lakehouse/legal_audit.token".into());
|
||||
let audit_state = catalogd::audit_endpoint::AuditEndpointState::new(
|
||||
registry.clone(),
|
||||
writer,
|
||||
std::path::Path::new(&legal_token_path),
|
||||
).await;
|
||||
app = app.nest("/audit", catalogd::audit_endpoint::router(audit_state));
|
||||
tracing::info!("audit endpoint mounted at /audit (legal token: {})", legal_token_path);
|
||||
} else {
|
||||
tracing::warn!("/audit endpoint NOT mounted — subject_audit writer is None (no signing key)");
|
||||
}
|
||||
|
||||
// Auth middleware (if enabled) — P5-001 fix 2026-04-23:
|
||||
// previously only inserted the ApiKey as an extension and never layered
|
||||
// the middleware, so auth.enabled=true enforced nothing. Now wraps the
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user