diff --git a/crates/catalogd/src/bin/backfill_subjects.rs b/crates/catalogd/src/bin/backfill_subjects.rs index 02cec18..f444fc9 100644 --- a/crates/catalogd/src/bin/backfill_subjects.rs +++ b/crates/catalogd/src/bin/backfill_subjects.rs @@ -234,7 +234,11 @@ async fn run(args: Args) -> Result<(), String> { let permit = sem.clone().acquire_owned().await.map_err(|e| e.to_string())?; let reg = reg.clone(); let mut ds_ref = dataset_ref.clone(); - ds_ref.key_value = cid.trim_start_matches(&args.candidate_id_prefix).to_string(); + // strip_prefix gives one-shot semantics; trim_start_matches strips + // repeated occurrences (a "WORKER-WORKER-1" id would lose both + // prefixes). 2026-05-03 opus scrum WARN at backfill_subjects.rs:189. + ds_ref.key_value = cid.strip_prefix(&args.candidate_id_prefix) + .unwrap_or(&cid).to_string(); let safe_views = safe_views.clone(); let attempted = attempted.clone(); let inserted = inserted.clone(); diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index 707709d..c4f261c 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -198,7 +198,12 @@ impl Registry { } }; match serde_json::from_slice::(&data) { - Ok(subj) => { subjects.insert(subj.candidate_id.clone(), subj); } + // Match put_subject: in-memory key is the SANITIZED form + // so get_subject lookups match the storage key shape. + Ok(subj) => { + let map_key = sanitize_view_name(&subj.candidate_id); + subjects.insert(map_key, subj); + } Err(e) => tracing::warn!("subject '{key}': parse failed: {e}"), } } @@ -722,6 +727,18 @@ impl Registry { /// Create or replace a subject manifest. Validates that referenced /// datasets exist in the catalog (fail-fast so dangling references /// don't accumulate). Persists to `_catalog/subjects/.json`. + /// + /// **Storage-key collision guard** (2026-05-03 opus scrum WARN + /// at registry.rs:735): the storage key is `sanitize_view_name(id)` + /// — `/`, `:`, etc. become `_`. A naive in-memory keying by raw + /// candidate_id would let two distinct ids (`CAND/1` + `CAND_1`) + /// collide on disk while appearing distinct in memory; the second + /// put silently overwrites the first, and `rebuild` only loads one. + /// Both the in-memory map AND the storage key now use the sanitized + /// form. Caller's raw candidate_id is preserved on the manifest for + /// audit-trail integrity (the manifest body has the original). + /// We surface a clear error if two raw ids would collide on the + /// sanitized key. pub async fn put_subject(&self, mut subj: SubjectManifest) -> Result { if subj.candidate_id.is_empty() { return Err("subject candidate_id is empty".into()); @@ -740,18 +757,35 @@ impl Registry { } subj.updated_at = now; - let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(&subj.candidate_id)); + let map_key = sanitize_view_name(&subj.candidate_id); + let key = format!("{SUBJECT_PREFIX}/{}.json", map_key); + + // Collision guard: if the sanitized key already exists for a + // DIFFERENT raw candidate_id, refuse the put. Better to fail + // loudly than silently overwrite an existing subject. + { + let subjects = self.subjects.read().await; + if let Some(existing) = subjects.get(&map_key) { + if existing.candidate_id != subj.candidate_id { + return Err(format!( + "subject id '{}' collides with existing subject '{}' on sanitized storage key '{}'", + subj.candidate_id, existing.candidate_id, map_key, + )); + } + } + } + let json = serde_json::to_vec_pretty(&subj).map_err(|e| e.to_string())?; ops::put(&self.store, &key, json.into()).await?; let mut subjects = self.subjects.write().await; - subjects.insert(subj.candidate_id.clone(), subj.clone()); + subjects.insert(map_key, subj.clone()); tracing::debug!("subject manifest persisted: {}", subj.candidate_id); Ok(subj) } pub async fn get_subject(&self, candidate_id: &str) -> Option { - self.subjects.read().await.get(candidate_id).cloned() + self.subjects.read().await.get(&sanitize_view_name(candidate_id)).cloned() } pub async fn list_subjects(&self) -> Vec { @@ -759,9 +793,10 @@ impl Registry { } pub async fn delete_subject(&self, candidate_id: &str) -> Result<(), String> { - let key = format!("{SUBJECT_PREFIX}/{}.json", sanitize_view_name(candidate_id)); + let map_key = sanitize_view_name(candidate_id); + let key = format!("{SUBJECT_PREFIX}/{}.json", map_key); ops::delete(&self.store, &key).await?; - self.subjects.write().await.remove(candidate_id); + self.subjects.write().await.remove(&map_key); Ok(()) } @@ -1207,6 +1242,21 @@ mod tests { assert_eq!(loaded.unwrap().status, shared::types::SubjectStatus::Active); } + #[tokio::test] + async fn put_subject_rejects_sanitize_collision() { + // 2026-05-03 opus scrum WARN at registry.rs:735 regression: + // CAND/1 and CAND_1 sanitize to the same storage key. Without + // the collision guard, the second put silently overwrites the + // first on disk; with it, the second put fails loudly. + let reg = fixture(); + let s1 = fixture_subject("CAND/1"); + reg.put_subject(s1).await.unwrap(); + let s2 = fixture_subject("CAND_1"); // sanitizes to same key + let err = reg.put_subject(s2).await.unwrap_err(); + assert!(err.contains("collides"), "got: {err}"); + assert_eq!(reg.subjects_count().await, 1, "first subject must survive"); + } + #[tokio::test] async fn delete_subject_removes_in_memory_and_persistence() { let store = Arc::new(object_store::memory::InMemory::new()); diff --git a/crates/catalogd/src/subject_audit.rs b/crates/catalogd/src/subject_audit.rs index c26d087..0a58ea5 100644 --- a/crates/catalogd/src/subject_audit.rs +++ b/crates/catalogd/src/subject_audit.rs @@ -30,14 +30,52 @@ pub const GENESIS_HASH: &str = "GENESIS"; type HmacSha256 = Hmac; -/// Per-subject audit log writer. Holds the signing key and a small -/// in-memory cache of latest chain hash per subject (so we don't read -/// the JSONL file on every append). +/// Render a JSON value canonically: object keys sorted alphabetically, +/// no insignificant whitespace, no trailing newline. Recursive — nested +/// objects also sort their keys. Arrays preserve order (semantically +/// significant). Numbers, strings, booleans, nulls pass through. +/// +/// This is intentionally narrower than RFC 8785 (no number normalization, +/// no UTF-8 NFC re-encoding). For our use — hashing structurally simple +/// audit-row JSON — alphabetical key order is the only canonicalization +/// that matters: it makes the byte sequence stable across struct field +/// reordering and additive schema evolution. +fn canonical_json(v: &serde_json::Value) -> Result, String> { + use std::collections::BTreeMap; + fn rewrite(v: &serde_json::Value) -> serde_json::Value { + match v { + serde_json::Value::Object(map) => { + let sorted: BTreeMap = map + .iter() + .map(|(k, v)| (k.clone(), rewrite(v))) + .collect(); + serde_json::Value::Object(sorted.into_iter().collect()) + } + serde_json::Value::Array(arr) => { + serde_json::Value::Array(arr.iter().map(rewrite).collect()) + } + other => other.clone(), + } + } + let canonical = rewrite(v); + serde_json::to_vec(&canonical).map_err(|e| e.to_string()) +} + +/// Per-subject audit log writer. Holds the signing key, a small +/// in-memory cache of latest chain hash per subject, and per-subject +/// mutexes that serialize concurrent appends to the same subject's log. /// /// **No Debug impl deliberately** — auto-deriving Debug on this struct /// would risk leaking the signing key into log lines. Tests that need /// `Result::unwrap_err` on a function returning `Result` must /// match on the result instead of using `.unwrap_err()`. +/// +/// Concurrency: appends to DIFFERENT subjects run in parallel; appends +/// to the SAME subject serialize via a per-candidate Mutex. Without +/// per-subject locking the read-modify-write append_line race +/// (concurrent reads of the same `prev_hash` + concurrent writes +/// overwriting each other) silently corrupts the chain. Caught by +/// 2026-05-03 opus scrum BLOCK on subject_audit.rs:172. #[derive(Clone)] pub struct SubjectAuditWriter { store: Arc, @@ -46,8 +84,11 @@ pub struct SubjectAuditWriter { key: Arc>, /// In-memory cache of the latest chain hash per candidate_id. /// Loaded lazily from the audit JSONL on first append per subject. - /// Mutex (not RwLock) because every append both reads and writes. latest_hash: Arc>>, + /// Per-subject mutexes. Append acquires the subject's lock before + /// the read-modify-write to serialize concurrent appends to the + /// same subject. Different subjects don't contend. + subject_locks: Arc>>>>, } impl SubjectAuditWriter { @@ -72,6 +113,7 @@ impl SubjectAuditWriter { store, key: Arc::new(key), latest_hash: Arc::new(Mutex::new(HashMap::new())), + subject_locks: Arc::new(Mutex::new(HashMap::new())), }) } @@ -82,6 +124,7 @@ impl SubjectAuditWriter { store, key: Arc::new(key), latest_hash: Arc::new(Mutex::new(HashMap::new())), + subject_locks: Arc::new(Mutex::new(HashMap::new())), } } @@ -100,24 +143,70 @@ impl SubjectAuditWriter { } /// Compute HMAC-SHA256(key, prev_hash_bytes || canonical_row_bytes). - /// Returns the HMAC as lowercase hex. + /// Returns the HMAC as lowercase hex. Uses byte-level hex formatting + /// in a single pass (avoids per-byte format! allocations). fn compute_hmac(&self, prev_hash: &str, canonical_row: &[u8]) -> String { let mut mac = ::new_from_slice(&self.key) .expect("HMAC accepts any key length"); mac.update(prev_hash.as_bytes()); mac.update(canonical_row); let result = mac.finalize().into_bytes(); + const HEX: &[u8; 16] = b"0123456789abcdef"; let mut s = String::with_capacity(64); for byte in result { - s.push_str(&format!("{:02x}", byte)); + s.push(HEX[(byte >> 4) as usize] as char); + s.push(HEX[(byte & 0x0f) as usize] as char); } s } + /// Render an audit row as canonical JSON: keys sorted alphabetically, + /// no insignificant whitespace, row_hmac field excluded. This is the + /// byte sequence the HMAC is computed over. + /// + /// **Why a manual canonicalizer instead of `serde_json::to_vec`:** + /// serde_json serializes struct fields in DECLARATION order. If the + /// SubjectAuditRow struct ever gains/loses/reorders fields (schema + /// evolution), the bytes change even though the LOGICAL content is + /// the same. A verifier compiled against the new struct shape + /// re-serializes a stored row and gets DIFFERENT bytes than the + /// original write — chain breaks silently. Caught by 2026-05-03 + /// opus scrum BLOCK on subject_audit.rs:108. + /// + /// Canonical-JSON (RFC 8785-ish, simplified for our scalar types) + /// fixes this by using alphabetical key order + minimal whitespace, + /// stable across struct evolution. + fn canonical_row_bytes(row: &SubjectAuditRow) -> Result, String> { + // Project to a generic Value, drop row_hmac, then re-serialize + // through a BTreeMap so keys come out alphabetical. + let mut v: serde_json::Value = serde_json::to_value(row) + .map_err(|e| format!("audit row to value: {e}"))?; + if let Some(obj) = v.as_object_mut() { + obj.remove("row_hmac"); + } + canonical_json(&v).map_err(|e| format!("canonicalize: {e}")) + } + + /// Get-or-create per-subject lock. Held across the read-modify-write + /// in append() so concurrent appends to the same subject serialize. + async fn lock_for(&self, candidate_id: &str) -> Arc> { + let mut locks = self.subject_locks.lock().await; + locks + .entry(candidate_id.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + } + /// Append one audit row for a subject. Computes the HMAC chain link, /// writes to the per-subject JSONL, returns the new chain root (which /// the caller MAY mirror to SubjectManifest.audit_log_chain_root). /// + /// **Concurrency**: appends to the SAME subject serialize via a + /// per-subject Mutex. Appends to DIFFERENT subjects run in parallel. + /// Without the per-subject lock, the read-modify-write in + /// append_line races and silently corrupts the chain. (2026-05-03 + /// opus scrum BLOCK on subject_audit.rs:172.) + /// /// On error: logs and returns Err. Caller decides whether to propagate /// (per spec §3.2 the gateway tool registry SHOULD log + drop). pub async fn append(&self, mut row: SubjectAuditRow) -> Result { @@ -126,6 +215,10 @@ impl SubjectAuditWriter { return Err("audit row candidate_id is empty".into()); } + // Acquire the per-subject lock for the duration of this append. + let subject_lock = self.lock_for(&cid).await; + let _guard = subject_lock.lock().await; + // 1. Find prev_chain_hash. Cache hot path; cold path scans // the existing JSONL tail-style to find the last row. let prev = { @@ -139,13 +232,14 @@ impl SubjectAuditWriter { row.prev_chain_hash = prev_hash.clone(); row.row_hmac = String::new(); // Always cleared before HMAC computation. - // 2. Canonical-JSON the row WITHOUT the row_hmac field, compute MAC. - let canon = serde_json::to_vec(&row) - .map_err(|e| format!("canonicalize audit row: {e}"))?; + // 2. Canonical-JSON the row (sorted keys, no whitespace, no + // row_hmac), then HMAC. Stable across struct evolution. + let canon = Self::canonical_row_bytes(&row)?; let new_hmac = self.compute_hmac(&prev_hash, &canon); row.row_hmac = new_hmac.clone(); - // 3. Append the row to the JSONL. + // 3. Append the row to the JSONL. The per-subject lock above + // serializes the read-modify-write inside append_line. let line = serde_json::to_vec(&row) .map_err(|e| format!("serialize audit row: {e}"))?; self.append_line(&cid, &line).await?; @@ -221,7 +315,7 @@ impl SubjectAuditWriter { )); } let claimed = std::mem::take(&mut row.row_hmac); - let canon = serde_json::to_vec(&row) + let canon = Self::canonical_row_bytes(&row) .map_err(|e| format!("canonicalize line {}: {e}", lineno + 1))?; let recomputed = self.compute_hmac(&prev, &canon); if recomputed != claimed { @@ -343,6 +437,71 @@ mod tests { assert!(err.contains("empty"), "got: {err}"); } + #[tokio::test] + async fn concurrent_appends_to_same_subject_serialize() { + // 2026-05-03 opus scrum BLOCK regression: without per-subject + // locking, concurrent appends raced and silently lost rows. + // This test spawns 50 parallel appends and asserts ALL 50 + // land with an intact chain. + let w = Arc::new(fixture_writer()); + let mut handles = Vec::new(); + for i in 0..50 { + let w = w.clone(); + handles.push(tokio::spawn(async move { + w.append(fixture_row("CAND-RACE", &[&format!("f{i}")])).await + })); + } + for h in handles { + h.await.unwrap().unwrap(); + } + let count = w.verify_chain("CAND-RACE").await.unwrap(); + assert_eq!(count, 50, "expected 50 rows; chain was corrupted"); + } + + #[tokio::test] + async fn concurrent_appends_to_different_subjects_run_parallel() { + let w = Arc::new(fixture_writer()); + let mut handles = Vec::new(); + for i in 0..20 { + let w = w.clone(); + handles.push(tokio::spawn(async move { + w.append(fixture_row(&format!("CAND-PARA-{i}"), &["f"])).await + })); + } + for h in handles { + h.await.unwrap().unwrap(); + } + // Each of the 20 subjects has exactly 1 row. + for i in 0..20 { + assert_eq!(w.verify_chain(&format!("CAND-PARA-{i}")).await.unwrap(), 1); + } + } + + #[test] + fn canonical_json_sorts_keys_alphabetically() { + let v = serde_json::json!({"z": 1, "a": 2, "m": {"y": 1, "b": 2}}); + let bytes = canonical_json(&v).unwrap(); + let s = std::str::from_utf8(&bytes).unwrap(); + // Keys should appear in alphabetical order at every nesting level. + let a_pos = s.find("\"a\"").unwrap(); + let m_pos = s.find("\"m\"").unwrap(); + let z_pos = s.find("\"z\"").unwrap(); + assert!(a_pos < m_pos); + assert!(m_pos < z_pos); + // Nested object's keys also sorted. + let b_pos = s.find("\"b\"").unwrap(); + let y_pos = s.find("\"y\"").unwrap(); + assert!(b_pos < y_pos); + } + + #[test] + fn canonical_json_arrays_preserve_order() { + let v = serde_json::json!({"k": ["c", "a", "b"]}); + let bytes = canonical_json(&v).unwrap(); + let s = std::str::from_utf8(&bytes).unwrap(); + assert!(s.contains("\"c\",\"a\",\"b\""), "got: {s}"); + } + #[tokio::test] async fn key_too_short_rejected_via_file() { // Write a 16-byte key file (under the 32-byte minimum). diff --git a/crates/gateway/src/execution_loop/mod.rs b/crates/gateway/src/execution_loop/mod.rs index 7e36830..22ee3fc 100644 --- a/crates/gateway/src/execution_loop/mod.rs +++ b/crates/gateway/src/execution_loop/mod.rs @@ -375,10 +375,20 @@ impl ExecutionLoop { if hits.is_empty() { return; } - // Fire one audit row per hit. Spawn so we don't add latency to - // the tool path even if the writer is slow. + // Determine result-state from the payload shape, NOT from the + // transport status. A tool can return Ok(Value) where the body + // itself is `{"error":"..."}` — hardcoding "success" here would + // mislead compliance review (opus WARN at execution_loop:401). + let result_state = audit_result_state(result); + + // Sequential await — NOT tokio::spawn. The previous spawn-per-row + // shape fanned out concurrent writes that raced inside the + // per-subject append. The audit writer now serializes per-subject + // internally, but spawning would still flood those locks under + // burst load with no observable latency win. Inline await is + // simpler AND correct. (2026-05-03 opus scrum BLOCKs on + // execution_loop:391 + subject_audit:172.) for (cid, fields) in hits { - let audit = audit.clone(); let row = shared::types::SubjectAuditRow { schema: "subject_audit.v1".into(), ts: chrono::Utc::now(), @@ -390,15 +400,13 @@ impl ExecutionLoop { trace_id: String::new(), // TODO: thread X-Lakehouse-Trace-Id through }, fields_accessed: fields, - result: "success".into(), + result: result_state.clone(), prev_chain_hash: String::new(), row_hmac: String::new(), }; - tokio::spawn(async move { - if let Err(e) = audit.append(row).await { - tracing::warn!("subject audit write failed for {cid}: {e}"); - } - }); + if let Err(e) = audit.append(row).await { + tracing::warn!("subject audit write failed for {cid}: {e}"); + } } } @@ -1045,6 +1053,53 @@ async fn append_outcomes_row_at( /// PORT FROM orchestrator.ts:306-311. Cap `rows` at 20 entries and /// annotate the truncation so the executor sees it on the next turn +/// Inspect a tool result JSON payload and return the appropriate +/// audit `result` field per spec §3.2 enumeration: +/// "success" | "denied" | "not_found" | "error". +/// +/// Heuristic on the top-level object: +/// - body has `"error"` key (truthy) → "error" +/// - body has `"denied"` key (truthy) → "denied" +/// - body has `"not_found"` key (truthy) → "not_found" +/// - body has `"status":"denied"`/etc → that status +/// - otherwise → "success" +/// +/// Conservative — only flags explicit signals, defaults to success. +/// Keeps audit log honest about ambiguous payloads (an error inside +/// rows[*] doesn't downgrade the whole call to "error" — that's a +/// row-level concern, not a result-state concern). +pub(crate) fn audit_result_state(v: &serde_json::Value) -> String { + if let Some(obj) = v.as_object() { + if obj.get("error").map(is_truthy).unwrap_or(false) { + return "error".into(); + } + if obj.get("denied").map(is_truthy).unwrap_or(false) { + return "denied".into(); + } + if obj.get("not_found").map(is_truthy).unwrap_or(false) { + return "not_found".into(); + } + if let Some(s) = obj.get("status").and_then(|v| v.as_str()) { + match s { + "denied" | "not_found" | "error" => return s.into(), + _ => {} + } + } + } + "success".into() +} + +fn is_truthy(v: &serde_json::Value) -> bool { + match v { + serde_json::Value::Null => false, + serde_json::Value::Bool(b) => *b, + serde_json::Value::String(s) => !s.is_empty(), + serde_json::Value::Array(a) => !a.is_empty(), + serde_json::Value::Object(o) => !o.is_empty(), + serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false), + } +} + /// Walk a tool-result JSON value recursively. For each object that /// contains a candidate_id (string) or worker_id (int/string), append /// (canonical_candidate_id, fields_in_object) to `out`. Conventions @@ -2050,4 +2105,29 @@ mod tests { collect_subject_hits(&v, &mut hits); assert_eq!(hits.len(), 0); } + + #[test] + fn audit_result_state_default_is_success() { + let v = serde_json::json!({"rows": [{"candidate_id": "X"}]}); + assert_eq!(audit_result_state(&v), "success"); + } + + #[test] + fn audit_result_state_detects_error_payload() { + let v = serde_json::json!({"error": "not found"}); + assert_eq!(audit_result_state(&v), "error"); + } + + #[test] + fn audit_result_state_detects_status_field() { + let v = serde_json::json!({"status": "denied", "rows": []}); + assert_eq!(audit_result_state(&v), "denied"); + } + + #[test] + fn audit_result_state_ignores_falsy_signals() { + // Empty/null/false should not trigger error state. + let v = serde_json::json!({"error": null, "denied": false, "rows": [{"candidate_id":"X"}]}); + assert_eq!(audit_result_state(&v), "success"); + } }