P42-002: wire truth gate into queryd /sql + /paged SQL paths
Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "journal event verified live (total_events_created 0→1 after probe)."

The scrum master flagged crates/queryd/src/service.rs across iters 3-5
with the same finding: "raw SQL forwarded to DataFusion without schema
or policy gate; violates PRD §42-002 truth enforcement." Confidence
79-95%, gradient tier auto/dry_run. Applier couldn't touch it — the fix
is larger than 6 lines and crosses crate boundaries.

Hand-fix lands the missing enforcement point:

  - truth: new RuleCondition::FieldContainsAny { field, needles } with
    case-insensitive substring matching. 4 new unit tests cover the
    positive, negative, missing-field, and empty-needles paths.
  - truth: sql_query_guard_store() helper returns a baseline store that
    rejects destructive verbs (DROP/TRUNCATE/DELETE FROM) and empty SQL.
  - queryd: QueryState grows an Arc<TruthStore>; default router() loads
    sql_query_guard_store; new router_with_truth(engine, store) lets
    tests inject a custom store.
  - queryd: sql_policy_check() runs truth.evaluate("sql_query", ctx)
    before hitting DataFusion. Reject/Block actions on matched
    conditions short-circuit to HTTP 403 with the rule's message.
    Both /sql and /paged gated.
  - queryd: 7 new tests cover block/allow/case-insensitive/false-
    positive scenarios. "SELECT deleted_at FROM t" must NOT be rejected
    (substring match is narrow: "delete from", not "delete").

Total: 28 truth tests green (was 24), 7 new queryd policy tests green.
Workspace baseline warnings unchanged at 11.

This is a signal-driven fix the mechanical pipeline couldn't produce
but the scrum master kept asking for. Closes one of four LOOPING files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-24 04:38:52 -05:00
parent 5e8d87bf34
commit 9cc0ceb894
3 changed files with 232 additions and 0 deletions

View File

@ -7,6 +7,7 @@ edition = "2024"
shared = { path = "../shared" } shared = { path = "../shared" }
catalogd = { path = "../catalogd" } catalogd = { path = "../catalogd" }
storaged = { path = "../storaged" } storaged = { path = "../storaged" }
truth = { path = "../truth" }
tokio = { workspace = true } tokio = { workspace = true }
axum = { workspace = true } axum = { workspace = true }
serde = { workspace = true } serde = { workspace = true }

View File

@ -9,6 +9,9 @@ use axum::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
use truth::{RuleAction, TruthStore};
use crate::context::QueryEngine; use crate::context::QueryEngine;
use crate::delta; use crate::delta;
use crate::paged::ResultStore; use crate::paged::ResultStore;
@ -17,12 +20,26 @@ use crate::paged::ResultStore;
pub struct QueryState { pub struct QueryState {
pub engine: QueryEngine, pub engine: QueryEngine,
pub result_store: ResultStore, pub result_store: ResultStore,
// Policy gate for incoming SQL. Every /sql and /paged request is
// evaluated against this store before hitting DataFusion. Added for
// P42-002 ("raw SQL forwarded without schema or policy gate") after
// the scrum master's queryd/service.rs finding looped across iters
// 3-5 without ever being reachable by the 6-line auto-applier.
pub truth: Arc<TruthStore>,
} }
pub fn router(engine: QueryEngine) -> Router { pub fn router(engine: QueryEngine) -> Router {
router_with_truth(engine, Arc::new(truth::sql_query_guard_store()))
}
/// Test/integration hook: construct the router with a caller-supplied
/// TruthStore so tests can assert reject/pass behavior deterministically
/// without depending on the default needle list.
pub fn router_with_truth(engine: QueryEngine, truth: Arc<TruthStore>) -> Router {
let state = QueryState { let state = QueryState {
engine: engine.clone(), engine: engine.clone(),
result_store: ResultStore::new(100, 50), // 100 rows/page, keep 50 results result_store: ResultStore::new(100, 50), // 100 rows/page, keep 50 results
truth,
}; };
Router::new() Router::new()
.route("/health", get(health)) .route("/health", get(health))
@ -71,12 +88,40 @@ fn batches_to_json(batches: &[RecordBatch]) -> Result<serde_json::Value, String>
serde_json::from_slice(&buf).map_err(|e| format!("JSON parse error: {e}")) serde_json::from_slice(&buf).map_err(|e| format!("JSON parse error: {e}"))
} }
/// Evaluate the request SQL against the configured TruthStore. Returns
/// the Reject/Block message on the first failing mandatory rule so the
/// handler can short-circuit. Returns None when all rules pass (or when
/// the failures' declared action is non-mandatory like Redact/Pass).
fn sql_policy_check(truth: &TruthStore, sql: &str) -> Option<String> {
let ctx = serde_json::json!({ "sql": sql });
for outcome in truth.evaluate("sql_query", &ctx) {
if !outcome.passed {
// FieldEmpty / FieldContainsAny etc. are enforced only when
// condition HOLDS (i.e. passed=true). Below means "passed=false",
// so the rule condition did not hold — no enforcement.
continue;
}
match &outcome.action {
RuleAction::Reject { message } | RuleAction::Block { message } => {
return Some(message.clone());
}
_ => {}
}
}
None
}
async fn execute_query( async fn execute_query(
State(state): State<QueryState>, State(state): State<QueryState>,
Json(req): Json<QueryRequest>, Json(req): Json<QueryRequest>,
) -> impl IntoResponse { ) -> impl IntoResponse {
tracing::info!("executing query: {}", req.sql); tracing::info!("executing query: {}", req.sql);
if let Some(reason) = sql_policy_check(&state.truth, &req.sql) {
tracing::warn!("sql rejected by truth gate: {reason}");
return Err((StatusCode::FORBIDDEN, reason));
}
match state.engine.query(&req.sql).await { match state.engine.query(&req.sql).await {
Ok(batches) => { Ok(batches) => {
if batches.is_empty() { if batches.is_empty() {
@ -115,6 +160,10 @@ async fn paged_query(
Json(req): Json<QueryRequest>, Json(req): Json<QueryRequest>,
) -> impl IntoResponse { ) -> impl IntoResponse {
tracing::info!("paged query: {}", req.sql); tracing::info!("paged query: {}", req.sql);
if let Some(reason) = sql_policy_check(&state.truth, &req.sql) {
tracing::warn!("paged sql rejected by truth gate: {reason}");
return Err((StatusCode::FORBIDDEN, reason));
}
match state.result_store.execute_and_store(&state.engine, &req.sql).await { match state.result_store.execute_and_store(&state.engine, &req.sql).await {
Ok(handle) => Ok(Json(handle)), Ok(handle) => Ok(Json(handle)),
Err(e) => Err((StatusCode::BAD_REQUEST, e)), Err(e) => Err((StatusCode::BAD_REQUEST, e)),
@ -211,3 +260,65 @@ async fn compact_dataset(
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
} }
} }
#[cfg(test)]
mod sql_policy_tests {
use super::*;
use truth::sql_query_guard_store;
// These tests exercise the policy gate without spinning up a DataFusion
// engine — they only need `TruthStore`. Purpose: prove the P42-002
// enforcement point actually rejects destructive SQL. This is the
// regression guard for the queryd/service.rs finding that looped
// across scrum iters 3-5.
#[test]
fn blocks_drop_table() {
let store = sql_query_guard_store();
let reason = sql_policy_check(&store, "DROP TABLE users").expect("must reject");
assert!(reason.contains("destructive"), "reason: {reason}");
}
#[test]
fn blocks_delete_from() {
let store = sql_query_guard_store();
assert!(sql_policy_check(&store, "delete from t where 1=1").is_some());
}
#[test]
fn blocks_truncate() {
let store = sql_query_guard_store();
assert!(sql_policy_check(&store, "TRUNCATE workers").is_some());
}
#[test]
fn blocks_empty_sql() {
let store = sql_query_guard_store();
assert!(sql_policy_check(&store, "").is_some());
}
#[test]
fn allows_benign_select() {
let store = sql_query_guard_store();
assert!(sql_policy_check(&store, "SELECT count(*) FROM workers").is_none());
}
#[test]
fn allows_select_with_deleted_word_in_column() {
// Substring match is narrow ("delete from", not "delete"), so a
// column named `deleted_at` doesn't trip the guard. Important
// check — false positives on benign queries would make the gate
// unusable in practice.
let store = sql_query_guard_store();
assert!(
sql_policy_check(&store, "SELECT deleted_at FROM t").is_none(),
"column names containing 'delete' must not be rejected"
);
}
#[test]
fn case_insensitive_match_catches_mixed_case() {
let store = sql_query_guard_store();
assert!(sql_policy_check(&store, "Drop Table X").is_some());
}
}

View File

@ -18,6 +18,11 @@ pub enum RuleCondition {
FieldMismatch { field: String, value: String }, FieldMismatch { field: String, value: String },
FieldEmpty { field: String }, FieldEmpty { field: String },
FieldGreater { field: String, threshold: i64 }, FieldGreater { field: String, threshold: i64 },
// Case-insensitive substring scan — true if the field value contains
// ANY of `needles`. Added for SQL/command guards where rules of the
// form "sql must not contain DROP/DELETE/TRUNCATE" need to express
// enforcement as a passing precondition being absent.
FieldContainsAny { field: String, needles: Vec<String> },
} }
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@ -124,6 +129,15 @@ fn evaluate_condition(cond: &RuleCondition, ctx: &serde_json::Value) -> bool {
.map(|n| n > *threshold) .map(|n| n > *threshold)
.unwrap_or(false) .unwrap_or(false)
} }
RuleCondition::FieldContainsAny { field, needles } => {
match field_as_string(ctx, field) {
None => false,
Some(s) => {
let haystack = s.to_ascii_lowercase();
needles.iter().any(|n| haystack.contains(&n.to_ascii_lowercase()))
}
}
}
} }
} }
@ -147,6 +161,49 @@ fn field_as_string(ctx: &serde_json::Value, path: &str) -> Option<String> {
}) })
} }
/// Minimal SQL guard — rejects destructive verbs (DROP/TRUNCATE/DELETE).
/// queryd/src/service.rs loads this into its `QueryState` and evaluates
/// every `/sql` request against it before hitting the DataFusion engine.
/// This is the P42-002 enforcement point flagged across scrum iters 3-5
/// ("raw SQL forwarded without schema or policy gate").
///
/// Intentionally narrow: it's a safety net, not a full SQL parser. If
/// callers need richer AST-aware enforcement they should extend this with
/// structured rules rather than new needles.
pub fn sql_query_guard_store() -> TruthStore {
let mut store = TruthStore::new();
store.add_rule(TruthRule {
id: "no-destructive-sql".to_string(),
task_class: "sql_query".to_string(),
description: "SQL must not contain destructive verbs".to_string(),
condition: RuleCondition::FieldContainsAny {
field: "sql".to_string(),
needles: vec![
"drop table".to_string(),
"drop schema".to_string(),
"drop database".to_string(),
"truncate".to_string(),
"delete from".to_string(),
],
},
action: RuleAction::Reject {
message: "destructive SQL rejected by truth.sql_query_guard".to_string(),
},
});
store.add_rule(TruthRule {
id: "sql-not-empty".to_string(),
task_class: "sql_query".to_string(),
description: "SQL must not be empty".to_string(),
condition: RuleCondition::FieldEmpty {
field: "sql".to_string(),
},
action: RuleAction::Reject {
message: "empty SQL rejected".to_string(),
},
});
store
}
pub fn default_truth_store() -> TruthStore { pub fn default_truth_store() -> TruthStore {
let mut store = TruthStore::new(); let mut store = TruthStore::new();
@ -520,4 +577,67 @@ mod tests {
let actions = s.check("t"); let actions = s.check("t");
assert_eq!(actions.len(), 3, "check returns one action per rule regardless of condition"); assert_eq!(actions.len(), 3, "check returns one action per rule regardless of condition");
} }
fn sql_guard_store() -> TruthStore {
let mut s = TruthStore::new();
s.add_rule(TruthRule {
id: "no-destructive".into(),
task_class: "sql_query".into(),
description: "SQL must not contain destructive verbs".into(),
condition: RuleCondition::FieldContainsAny {
field: "sql".into(),
needles: vec![
"drop table".into(),
"drop schema".into(),
"truncate".into(),
"delete from".into(),
],
},
action: RuleAction::Reject {
message: "destructive SQL rejected".into(),
},
});
s
}
#[test]
fn field_contains_any_matches_case_insensitively() {
let s = sql_guard_store();
let ctx = serde_json::json!({"sql": "SELECT * FROM t; DROP TABLE users;"});
let o = s.evaluate("sql_query", &ctx);
assert!(o[0].passed, "condition holds when needle present (case-insensitive)");
}
#[test]
fn field_contains_any_is_false_when_no_needle_matches() {
let s = sql_guard_store();
let ctx = serde_json::json!({"sql": "SELECT count(*) FROM workers"});
let o = s.evaluate("sql_query", &ctx);
assert!(!o[0].passed, "benign SELECT should not match destructive needles");
}
#[test]
fn field_contains_any_false_when_field_missing() {
let s = sql_guard_store();
let ctx = serde_json::json!({});
let o = s.evaluate("sql_query", &ctx);
assert!(!o[0].passed, "missing field → condition cannot hold");
}
#[test]
fn field_contains_any_empty_needles_list_never_matches() {
let mut s = TruthStore::new();
s.add_rule(TruthRule {
id: "empty".into(),
task_class: "x".into(),
description: "".into(),
condition: RuleCondition::FieldContainsAny {
field: "sql".into(),
needles: vec![],
},
action: RuleAction::Pass,
});
let o = s.evaluate("x", &serde_json::json!({"sql": "anything"}));
assert!(!o[0].passed, "no needles → any::<bool> is false");
}
} }