subjects: 2nd scrum fix wave (token min, chain_tip, tampering, rebuild collision warn)

Second cross-lineage scrum on Steps 5+6 returned 13 distinct findings, 0 convergent.
Three BLOCK-class claims verified as false positives (cache IS written, per-subject
Mutex IS in place, spawn IS safe under writer's lock). Five real fixes shipped:

1. audit_endpoint: legal token min length 16->32 (HMAC-SHA256 best practice, kimi)
2. subject_audit: new chain_tip() returns last hash from full log; audit_endpoint
   now reports chain_root from full chain instead of windowed slice (opus)
3. registry: rebuild loader now warns on sanitize collision (symmetric with
   put_subject's collision guard - opus)
4. audit_endpoint: tampering detection - if manifest expects non-empty chain_root
   but log returns 0 rows, flag chain_verified=false with explicit message (opus)
5. execution_loop::audit_result_state: tightened heuristic - error/denied/not_found
   only classify when no rows/data/results sibling (opus INFO)

Tests: 17 catalogd subject + 6 gateway audit_result_state, all green.
New: audit_result_state_does_not_classify_error_when_data_sibling_present,
     audit_result_state_status_is_authoritative_even_with_data.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 04:00:42 -05:00
parent 15cfd76c04
commit 2a4b316a15
5 changed files with 109 additions and 15 deletions

3
Cargo.lock generated
View File

@ -1365,10 +1365,13 @@ dependencies = [
"axum", "axum",
"bytes", "bytes",
"chrono", "chrono",
"hmac",
"object_store", "object_store",
"parquet 55.2.0",
"proto", "proto",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"shared", "shared",
"storaged", "storaged",
"tokio", "tokio",

View File

@ -69,9 +69,13 @@ impl AuditEndpointState {
legal_token_path.display() legal_token_path.display()
); );
None None
} else if trimmed.len() < 16 { } else if trimmed.len() < 32 {
// 2026-05-03 kimi scrum BLOCK at audit_endpoint.rs:59-60:
// 16-byte minimum was too permissive for a legal-tier
// secret. Aligned to the SubjectAuditWriter signing-key
// minimum (32 bytes — HMAC-SHA256 best practice).
tracing::warn!( tracing::warn!(
"audit endpoint: legal token at {} is {} chars (recommend ≥32) — endpoint will 503", "audit endpoint: legal token at {} is {} chars (minimum 32) — endpoint will 503",
legal_token_path.display(), trimmed.len() legal_token_path.display(), trimmed.len()
); );
None None
@ -228,19 +232,38 @@ async fn audit_subject(
}; };
// 3. Verify chain end-to-end (full log, not just the windowed slice; // 3. Verify chain end-to-end (full log, not just the windowed slice;
// chain integrity is global, not per-window). // chain integrity is global, not per-window). chain_root MUST
// come from the actual log tip — taking it from the windowed
// `rows` would lie when `to` is set or the window catches no
// rows. (2026-05-03 opus scrum WARN audit_endpoint.rs:206.)
let (chain_verified, chain_rows_total, chain_root, chain_err) = let (chain_verified, chain_rows_total, chain_root, chain_err) =
match state.writer.verify_chain(&candidate_id).await { match state.writer.verify_chain(&candidate_id).await {
Ok(n) => { Ok(n) => {
// Find the latest row's hmac as chain_root for echo. let tip = state.writer.chain_tip(&candidate_id).await
let root = rows.last()
.map(|r| r.row_hmac.clone())
.unwrap_or_else(|| "GENESIS".into()); .unwrap_or_else(|| "GENESIS".into());
(true, n, root, None) (true, n, tip, None)
} }
Err(e) => (false, 0, String::new(), Some(e)), Err(e) => (false, 0, String::new(), Some(e)),
}; };
// 4. Tampering detection: if the manifest claims a non-trivial
// chain root but the log is empty (chain_rows_total == 0), the
// log was deleted between writes. Flag it. (2026-05-03 opus
// scrum WARN subject_audit.rs:347 + audit_endpoint coverage.)
let manifest_root = manifest.audit_log_chain_root.clone();
let log_was_tampered = chain_verified
&& chain_rows_total == 0
&& !manifest_root.is_empty()
&& manifest_root != "GENESIS";
let (chain_verified, chain_err) = if log_was_tampered {
(false, Some(format!(
"manifest expects chain root '{}' but the audit log has 0 rows; log was deleted or wiped",
manifest_root,
)))
} else {
(chain_verified, chain_err)
};
let datasets_referenced = manifest.datasets.iter() let datasets_referenced = manifest.datasets.iter()
.map(|d| DatasetRefEcho { .map(|d| DatasetRefEcho {
name: d.name.clone(), name: d.name.clone(),

View File

@ -202,6 +202,20 @@ impl Registry {
// so get_subject lookups match the storage key shape. // so get_subject lookups match the storage key shape.
Ok(subj) => { Ok(subj) => {
let map_key = sanitize_view_name(&subj.candidate_id); let map_key = sanitize_view_name(&subj.candidate_id);
// Rebuild collision detection — symmetric with the
// put_subject collision guard. If pre-existing storage
// somehow contains two manifests whose raw ids
// sanitize to the same key, log loudly so operators
// know one was dropped instead of silently losing it.
// (2026-05-03 opus scrum WARN registry.rs:201.)
if let Some(existing) = subjects.get(&map_key) {
if existing.candidate_id != subj.candidate_id {
tracing::warn!(
"subject rebuild collision on sanitized key '{map_key}': raw '{}' was loaded earlier; raw '{}' is being silently overwritten in memory. Run dedupe + delete the duplicate manifest file at storage.",
existing.candidate_id, subj.candidate_id,
);
}
}
subjects.insert(map_key, subj); subjects.insert(map_key, subj);
} }
Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"), Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"),

View File

@ -330,6 +330,27 @@ impl SubjectAuditWriter {
Ok(out) Ok(out)
} }
/// Return the latest row_hmac in this subject's audit log, or None
/// if the log doesn't exist / has no rows. Cache-aware: hits the
/// in-memory cache when the writer has appended at least once for
/// this subject in the current process, otherwise scans the log
/// tail (single object_store read).
///
/// Used by /audit/subject/{id} to echo the actual chain tip in the
/// response. Taking the tip from a windowed slice of rows would
/// lie when the window doesn't cover the latest row.
pub async fn chain_tip(&self, candidate_id: &str) -> Option<String> {
// Cache hit?
if let Some(h) = self.latest_hash.lock().await.get(candidate_id).cloned() {
return Some(h);
}
// Cold path — scan the log tail. Returns GENESIS for empty/missing
// logs; we coerce that to None so callers can distinguish "no audit
// activity yet" from "real chain tip exists".
let tip = self.scan_latest_hash(candidate_id).await;
if tip == GENESIS_HASH { None } else { Some(tip) }
}
/// Verify the full HMAC chain for a subject. Returns Ok(rows_verified) /// Verify the full HMAC chain for a subject. Returns Ok(rows_verified)
/// or Err with the first chain break encountered. /// or Err with the first chain break encountered.
/// ///

View File

@ -1070,6 +1070,17 @@ async fn append_outcomes_row_at(
/// row-level concern, not a result-state concern). /// row-level concern, not a result-state concern).
pub(crate) fn audit_result_state(v: &serde_json::Value) -> String { pub(crate) fn audit_result_state(v: &serde_json::Value) -> String {
if let Some(obj) = v.as_object() { if let Some(obj) = v.as_object() {
// Tighter than before (2026-05-03 opus scrum INFO at
// execution_loop:1085): only flag error/denied/not_found when
// the payload doesn't ALSO have a rows/data/results sibling.
// A successful tool whose response includes an "error" field
// describing one row's problem (out of many) shouldn't taint
// the whole call as "error" — that's a row-level concern, not
// a result-state concern.
let has_data_sibling = obj.contains_key("rows")
|| obj.contains_key("data")
|| obj.contains_key("results");
if !has_data_sibling {
if obj.get("error").map(is_truthy).unwrap_or(false) { if obj.get("error").map(is_truthy).unwrap_or(false) {
return "error".into(); return "error".into();
} }
@ -1079,6 +1090,9 @@ pub(crate) fn audit_result_state(v: &serde_json::Value) -> String {
if obj.get("not_found").map(is_truthy).unwrap_or(false) { if obj.get("not_found").map(is_truthy).unwrap_or(false) {
return "not_found".into(); return "not_found".into();
} }
}
// status is always authoritative — if a tool explicitly says
// status=denied that's not a row-level concern.
if let Some(s) = obj.get("status").and_then(|v| v.as_str()) { if let Some(s) = obj.get("status").and_then(|v| v.as_str()) {
match s { match s {
"denied" | "not_found" | "error" => return s.into(), "denied" | "not_found" | "error" => return s.into(),
@ -2130,4 +2144,23 @@ mod tests {
let v = serde_json::json!({"error": null, "denied": false, "rows": [{"candidate_id":"X"}]}); let v = serde_json::json!({"error": null, "denied": false, "rows": [{"candidate_id":"X"}]});
assert_eq!(audit_result_state(&v), "success"); assert_eq!(audit_result_state(&v), "success");
} }
#[test]
fn audit_result_state_does_not_classify_error_when_data_sibling_present() {
// 2026-05-03 opus scrum INFO at execution_loop:1085 regression:
// a tool that returns rows BUT also has an "error" field describing
// one bad row shouldn't taint the whole call as "error".
let v = serde_json::json!({
"rows": [{"candidate_id": "OK"}, {"candidate_id": "BAD"}],
"error": "1 row had a malformed certification"
});
assert_eq!(audit_result_state(&v), "success");
}
#[test]
fn audit_result_state_status_is_authoritative_even_with_data() {
// Explicit status="denied" wins over data-sibling rule.
let v = serde_json::json!({"status": "denied", "rows": []});
assert_eq!(audit_result_state(&v), "denied");
}
} }