diff --git a/Cargo.lock b/Cargo.lock index 4b0285b..1985e5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1365,10 +1365,13 @@ dependencies = [ "axum", "bytes", "chrono", + "hmac", "object_store", + "parquet 55.2.0", "proto", "serde", "serde_json", + "sha2", "shared", "storaged", "tokio", diff --git a/crates/catalogd/src/audit_endpoint.rs b/crates/catalogd/src/audit_endpoint.rs index 9fc9324..7847434 100644 --- a/crates/catalogd/src/audit_endpoint.rs +++ b/crates/catalogd/src/audit_endpoint.rs @@ -69,9 +69,13 @@ impl AuditEndpointState { legal_token_path.display() ); None - } else if trimmed.len() < 16 { + } else if trimmed.len() < 32 { + // 2026-05-03 kimi scrum BLOCK at audit_endpoint.rs:59-60: + // 16-byte minimum was too permissive for a legal-tier + // secret. Aligned to the SubjectAuditWriter signing-key + // minimum (32 bytes — HMAC-SHA256 best practice). tracing::warn!( - "audit endpoint: legal token at {} is {} chars (recommend ≥32) — endpoint will 503", + "audit endpoint: legal token at {} is {} chars (minimum 32) — endpoint will 503", legal_token_path.display(), trimmed.len() ); None @@ -228,19 +232,38 @@ async fn audit_subject( }; // 3. Verify chain end-to-end (full log, not just the windowed slice; - // chain integrity is global, not per-window). + // chain integrity is global, not per-window). chain_root MUST + // come from the actual log tip — taking it from the windowed + // `rows` would lie when `to` is set or the window catches no + // rows. (2026-05-03 opus scrum WARN audit_endpoint.rs:206.) let (chain_verified, chain_rows_total, chain_root, chain_err) = match state.writer.verify_chain(&candidate_id).await { Ok(n) => { - // Find the latest row's hmac as chain_root for echo. - let root = rows.last() - .map(|r| r.row_hmac.clone()) + let tip = state.writer.chain_tip(&candidate_id).await .unwrap_or_else(|| "GENESIS".into()); - (true, n, root, None) + (true, n, tip, None) } Err(e) => (false, 0, String::new(), Some(e)), }; + // 4. Tampering detection: if the manifest claims a non-trivial + // chain root but the log is empty (chain_rows_total == 0), the + // log was deleted between writes. Flag it. (2026-05-03 opus + // scrum WARN subject_audit.rs:347 + audit_endpoint coverage.) + let manifest_root = manifest.audit_log_chain_root.clone(); + let log_was_tampered = chain_verified + && chain_rows_total == 0 + && !manifest_root.is_empty() + && manifest_root != "GENESIS"; + let (chain_verified, chain_err) = if log_was_tampered { + (false, Some(format!( + "manifest expects chain root '{}' but the audit log has 0 rows; log was deleted or wiped", + manifest_root, + ))) + } else { + (chain_verified, chain_err) + }; + let datasets_referenced = manifest.datasets.iter() .map(|d| DatasetRefEcho { name: d.name.clone(), diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index c4f261c..ad07538 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -202,6 +202,20 @@ impl Registry { // so get_subject lookups match the storage key shape. Ok(subj) => { let map_key = sanitize_view_name(&subj.candidate_id); + // Rebuild collision detection — symmetric with the + // put_subject collision guard. If pre-existing storage + // somehow contains two manifests whose raw ids + // sanitize to the same key, log loudly so operators + // know one was dropped instead of silently losing it. + // (2026-05-03 opus scrum WARN registry.rs:201.) + if let Some(existing) = subjects.get(&map_key) { + if existing.candidate_id != subj.candidate_id { + tracing::warn!( + "subject rebuild collision on sanitized key '{map_key}': raw '{}' was loaded earlier; raw '{}' is being silently overwritten in memory. Run dedupe + delete the duplicate manifest file at storage.", + existing.candidate_id, subj.candidate_id, + ); + } + } subjects.insert(map_key, subj); } Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"), diff --git a/crates/catalogd/src/subject_audit.rs b/crates/catalogd/src/subject_audit.rs index a106c14..db505c6 100644 --- a/crates/catalogd/src/subject_audit.rs +++ b/crates/catalogd/src/subject_audit.rs @@ -330,6 +330,27 @@ impl SubjectAuditWriter { Ok(out) } + /// Return the latest row_hmac in this subject's audit log, or None + /// if the log doesn't exist / has no rows. Cache-aware: hits the + /// in-memory cache when the writer has appended at least once for + /// this subject in the current process, otherwise scans the log + /// tail (single object_store read). + /// + /// Used by /audit/subject/{id} to echo the actual chain tip in the + /// response. Taking the tip from a windowed slice of rows would + /// lie when the window doesn't cover the latest row. + pub async fn chain_tip(&self, candidate_id: &str) -> Option { + // Cache hit? + if let Some(h) = self.latest_hash.lock().await.get(candidate_id).cloned() { + return Some(h); + } + // Cold path — scan the log tail. Returns GENESIS for empty/missing + // logs; we coerce that to None so callers can distinguish "no audit + // activity yet" from "real chain tip exists". + let tip = self.scan_latest_hash(candidate_id).await; + if tip == GENESIS_HASH { None } else { Some(tip) } + } + /// Verify the full HMAC chain for a subject. Returns Ok(rows_verified) /// or Err with the first chain break encountered. /// diff --git a/crates/gateway/src/execution_loop/mod.rs b/crates/gateway/src/execution_loop/mod.rs index 22ee3fc..b570544 100644 --- a/crates/gateway/src/execution_loop/mod.rs +++ b/crates/gateway/src/execution_loop/mod.rs @@ -1070,15 +1070,29 @@ async fn append_outcomes_row_at( /// row-level concern, not a result-state concern). pub(crate) fn audit_result_state(v: &serde_json::Value) -> String { if let Some(obj) = v.as_object() { - if obj.get("error").map(is_truthy).unwrap_or(false) { - return "error".into(); - } - if obj.get("denied").map(is_truthy).unwrap_or(false) { - return "denied".into(); - } - if obj.get("not_found").map(is_truthy).unwrap_or(false) { - return "not_found".into(); + // Tighter than before (2026-05-03 opus scrum INFO at + // execution_loop:1085): only flag error/denied/not_found when + // the payload doesn't ALSO have a rows/data/results sibling. + // A successful tool whose response includes an "error" field + // describing one row's problem (out of many) shouldn't taint + // the whole call as "error" — that's a row-level concern, not + // a result-state concern. + let has_data_sibling = obj.contains_key("rows") + || obj.contains_key("data") + || obj.contains_key("results"); + if !has_data_sibling { + if obj.get("error").map(is_truthy).unwrap_or(false) { + return "error".into(); + } + if obj.get("denied").map(is_truthy).unwrap_or(false) { + return "denied".into(); + } + if obj.get("not_found").map(is_truthy).unwrap_or(false) { + return "not_found".into(); + } } + // status is always authoritative — if a tool explicitly says + // status=denied that's not a row-level concern. if let Some(s) = obj.get("status").and_then(|v| v.as_str()) { match s { "denied" | "not_found" | "error" => return s.into(), @@ -2130,4 +2144,23 @@ mod tests { let v = serde_json::json!({"error": null, "denied": false, "rows": [{"candidate_id":"X"}]}); assert_eq!(audit_result_state(&v), "success"); } + + #[test] + fn audit_result_state_does_not_classify_error_when_data_sibling_present() { + // 2026-05-03 opus scrum INFO at execution_loop:1085 regression: + // a tool that returns rows BUT also has an "error" field describing + // one bad row shouldn't taint the whole call as "error". + let v = serde_json::json!({ + "rows": [{"candidate_id": "OK"}, {"candidate_id": "BAD"}], + "error": "1 row had a malformed certification" + }); + assert_eq!(audit_result_state(&v), "success"); + } + + #[test] + fn audit_result_state_status_is_authoritative_even_with_data() { + // Explicit status="denied" wins over data-sibling rule. + let v = serde_json::json!({"status": "denied", "rows": []}); + assert_eq!(audit_result_state(&v), "denied"); + } }