catalogd: Step 8 — parity_subject_audit binary (Rust side)

Per docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 8.

Cross-runtime parity helper consumed by:
  golangLAKEHOUSE/scripts/cutover/parity/subject_audit_parity.sh

Two modes:

  --known-answer
    Print canonical-JSON + HMAC for a hardcoded fixture row. The Go
    helper at golangLAKEHOUSE/scripts/cutover/parity/subject_audit_helper/
    must produce byte-identical output. Catches algorithm drift
    (canonical-JSON sort order, HMAC algorithm, hex encoding).

  --verify <audit_log_path> --key <key_path>
    Replay the chain on a real production audit log via the live
    SubjectAuditWriter::verify_chain (no re-implementation; the actual
    production verification path). Output: one JSON line with mode,
    count, tip, verified, error.

The helper exercises the SAME verify_chain path the gateway calls, so
algorithm changes in subject_audit.rs automatically flow into the
parity probe.

Live-verified against 5 production audit logs in data/_catalog/subjects;
all 6 parity assertions pass after fixing two real cross-runtime drifts
on the Go side (omitempty trace_id stripping field; time.RFC3339Nano
stripping trailing zero in nanoseconds — both caught by this probe).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 04:16:50 -05:00
parent 8fc6238dea
commit 2413c96817

View File

@ -0,0 +1,220 @@
//! Cross-runtime parity helper for subject-audit chain.
//!
//! Specification: docs/specs/SUBJECT_MANIFESTS_ON_CATALOGD.md §5 Step 8.
//!
//! This binary is consumed by scripts/cutover/parity/subject_audit_parity.sh
//! (which lives in /home/profit/golangLAKEHOUSE/scripts/cutover/parity/).
//! Its Go counterpart is at golangLAKEHOUSE/scripts/cutover/parity/subject_audit_helper/main.go.
//!
//! Both helpers MUST produce byte-identical output for the same inputs.
//! Divergence here is a parity break — a SubjectManifest written by Rust
//! that Go cannot verify, or vice versa.
//!
//! Two modes:
//!
//! --known-answer
//! Print the canonical bytes + HMAC of a hardcoded fixture row.
//! The Go helper must produce IDENTICAL bytes + IDENTICAL hash.
//! Hardcoded fixture matches Go test TestKnownAnswerVector.
//!
//! --verify <audit_log_path> --key <key_path>
//! Parse the JSONL audit log, replay the HMAC chain. Print
//! JSON: {"count": N, "tip": "<hash|GENESIS>", "verified": bool, "error": "<msg|null>"}.
//!
//! Output format: ONE JSON object per stdout, terminating newline. The
//! parity script diffs Rust stdout vs Go stdout via `diff -q`.
use catalogd::subject_audit::SubjectAuditWriter;
use hmac::{Hmac, Mac};
use object_store::ObjectStore;
use object_store::memory::InMemory;
use serde::Serialize;
use sha2::Sha256;
use shared::types::{AuditAccessor, SubjectAuditRow};
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
type HmacSha256 = Hmac<Sha256>;
const GENESIS: &str = "GENESIS";
/// Same canonical-JSON algorithm as crates/catalogd/src/subject_audit.rs.
/// Reproduced here so the helper does not depend on internal-only items
/// in that file. If the algorithm there changes, change it here AND in
/// the Go helper in lockstep.
fn canonical_json(v: &serde_json::Value) -> Vec<u8> {
fn rewrite(v: &serde_json::Value) -> serde_json::Value {
match v {
serde_json::Value::Object(map) => {
let sorted: BTreeMap<String, serde_json::Value> = map
.iter()
.map(|(k, v)| (k.clone(), rewrite(v)))
.collect();
serde_json::Value::Object(sorted.into_iter().collect())
}
serde_json::Value::Array(arr) => {
serde_json::Value::Array(arr.iter().map(rewrite).collect())
}
other => other.clone(),
}
}
serde_json::to_vec(&rewrite(v)).expect("canonical-json serialize")
}
fn canonical_row_bytes(row: &SubjectAuditRow) -> Vec<u8> {
let mut v: serde_json::Value = serde_json::to_value(row).expect("row to value");
if let Some(obj) = v.as_object_mut() {
obj.remove("row_hmac");
}
canonical_json(&v)
}
fn compute_hmac(key: &[u8], prev: &str, canonical: &[u8]) -> String {
let mut mac = <HmacSha256 as Mac>::new_from_slice(key).expect("HMAC accepts any key length");
mac.update(prev.as_bytes());
mac.update(canonical);
let result = mac.finalize().into_bytes();
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(64);
for byte in result {
s.push(HEX[(byte >> 4) as usize] as char);
s.push(HEX[(byte & 0x0f) as usize] as char);
}
s
}
fn deterministic_key() -> Vec<u8> {
(0u8..32).collect()
}
#[derive(Serialize)]
struct KnownAnswerOut {
mode: &'static str,
canonical: String,
hmac: String,
canonical_bytes_len: usize,
}
#[derive(Serialize)]
struct VerifyOut {
mode: &'static str,
count: usize,
tip: String,
verified: bool,
error: Option<String>,
}
fn known_answer() {
let row = SubjectAuditRow {
schema: "subject_audit.v1".into(),
ts: chrono::DateTime::parse_from_rfc3339("2026-05-03T12:00:00Z")
.unwrap()
.with_timezone(&chrono::Utc),
candidate_id: "WORKER-FIXED".into(),
accessor: AuditAccessor {
kind: "gateway_lookup".into(),
daemon: "gateway".into(),
purpose: "parity_test".into(),
trace_id: "trace-fixed".into(),
},
fields_accessed: vec!["name".into()],
result: "success".into(),
prev_chain_hash: GENESIS.into(),
row_hmac: String::new(),
};
let canon = canonical_row_bytes(&row);
let hmac = compute_hmac(&deterministic_key(), GENESIS, &canon);
let out = KnownAnswerOut {
mode: "known_answer",
canonical: String::from_utf8(canon.clone()).expect("canonical is utf-8"),
hmac,
canonical_bytes_len: canon.len(),
};
println!("{}", serde_json::to_string(&out).unwrap());
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let argv: Vec<String> = std::env::args().collect();
let mut mode_known_answer = false;
let mut audit_path: Option<PathBuf> = None;
let mut key_path: Option<PathBuf> = None;
let mut i = 1;
while i < argv.len() {
match argv[i].as_str() {
"--known-answer" => {
mode_known_answer = true;
i += 1;
}
"--verify" => {
audit_path = Some(PathBuf::from(
argv.get(i + 1).expect("--verify needs path"),
));
i += 2;
}
"--key" => {
key_path = Some(PathBuf::from(argv.get(i + 1).expect("--key needs path")));
i += 2;
}
"-h" | "--help" => {
eprintln!("parity_subject_audit --known-answer");
eprintln!("parity_subject_audit --verify <audit_log_path> --key <key_path>");
std::process::exit(0);
}
other => {
eprintln!("unknown arg: {other}");
std::process::exit(2);
}
}
}
if mode_known_answer {
known_answer();
return;
}
let audit_path = audit_path.expect("need --known-answer OR --verify ... --key ...");
let key_path = key_path.expect("--verify also needs --key");
let key = std::fs::read(&key_path).expect("read key file");
let candidate_id = audit_path
.file_name()
.and_then(|s| s.to_str())
.and_then(|s| s.strip_suffix(".audit.jsonl"))
.expect("audit log path must end with <candidate_id>.audit.jsonl")
.to_string();
// Stand up an in-memory object store, seed it with the audit log
// bytes at the canonical key, then ask SubjectAuditWriter to verify.
// This way we exercise the SAME verify_chain function the production
// gateway calls — not a re-implementation that might drift.
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let bytes = std::fs::read(&audit_path).expect("read audit log");
let log_key = format!("_catalog/subjects/{}.audit.jsonl", candidate_id);
storaged::ops::put(&store, &log_key, bytes::Bytes::from(bytes))
.await
.expect("seed object store");
let writer = SubjectAuditWriter::with_inline_key(store, key);
let result = writer.verify_chain(&candidate_id).await;
let tip = writer.chain_tip(&candidate_id).await.unwrap_or(GENESIS.into());
let out = match result {
Ok(count) => VerifyOut {
mode: "verify",
count,
tip,
verified: true,
error: None,
},
Err(e) => VerifyOut {
mode: "verify",
count: 0,
tip: GENESIS.into(),
verified: false,
error: Some(e),
},
};
println!("{}", serde_json::to_string(&out).unwrap());
}