diff --git a/crates/queryd/Cargo.toml b/crates/queryd/Cargo.toml index 4064f63..be1618f 100644 --- a/crates/queryd/Cargo.toml +++ b/crates/queryd/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" shared = { path = "../shared" } catalogd = { path = "../catalogd" } storaged = { path = "../storaged" } +truth = { path = "../truth" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/queryd/src/service.rs b/crates/queryd/src/service.rs index 7f0028c..8842af7 100644 --- a/crates/queryd/src/service.rs +++ b/crates/queryd/src/service.rs @@ -9,6 +9,9 @@ use axum::{ }; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use truth::{RuleAction, TruthStore}; + use crate::context::QueryEngine; use crate::delta; use crate::paged::ResultStore; @@ -17,12 +20,26 @@ use crate::paged::ResultStore; pub struct QueryState { pub engine: QueryEngine, pub result_store: ResultStore, + // Policy gate for incoming SQL. Every /sql and /paged request is + // evaluated against this store before hitting DataFusion. Added for + // P42-002 ("raw SQL forwarded without schema or policy gate") after + // the scrum master's queryd/service.rs finding looped across iters + // 3-5 without ever being reachable by the 6-line auto-applier. + pub truth: Arc, } pub fn router(engine: QueryEngine) -> Router { + router_with_truth(engine, Arc::new(truth::sql_query_guard_store())) +} + +/// Test/integration hook: construct the router with a caller-supplied +/// TruthStore so tests can assert reject/pass behavior deterministically +/// without depending on the default needle list. +pub fn router_with_truth(engine: QueryEngine, truth: Arc) -> Router { let state = QueryState { engine: engine.clone(), result_store: ResultStore::new(100, 50), // 100 rows/page, keep 50 results + truth, }; Router::new() .route("/health", get(health)) @@ -71,12 +88,40 @@ fn batches_to_json(batches: &[RecordBatch]) -> Result serde_json::from_slice(&buf).map_err(|e| format!("JSON parse error: {e}")) } +/// Evaluate the request SQL against the configured TruthStore. Returns +/// the Reject/Block message on the first failing mandatory rule so the +/// handler can short-circuit. Returns None when all rules pass (or when +/// the failures' declared action is non-mandatory like Redact/Pass). +fn sql_policy_check(truth: &TruthStore, sql: &str) -> Option { + let ctx = serde_json::json!({ "sql": sql }); + for outcome in truth.evaluate("sql_query", &ctx) { + if !outcome.passed { + // FieldEmpty / FieldContainsAny etc. are enforced only when + // condition HOLDS (i.e. passed=true). Below means "passed=false", + // so the rule condition did not hold — no enforcement. + continue; + } + match &outcome.action { + RuleAction::Reject { message } | RuleAction::Block { message } => { + return Some(message.clone()); + } + _ => {} + } + } + None +} + async fn execute_query( State(state): State, Json(req): Json, ) -> impl IntoResponse { tracing::info!("executing query: {}", req.sql); + if let Some(reason) = sql_policy_check(&state.truth, &req.sql) { + tracing::warn!("sql rejected by truth gate: {reason}"); + return Err((StatusCode::FORBIDDEN, reason)); + } + match state.engine.query(&req.sql).await { Ok(batches) => { if batches.is_empty() { @@ -115,6 +160,10 @@ async fn paged_query( Json(req): Json, ) -> impl IntoResponse { tracing::info!("paged query: {}", req.sql); + if let Some(reason) = sql_policy_check(&state.truth, &req.sql) { + tracing::warn!("paged sql rejected by truth gate: {reason}"); + return Err((StatusCode::FORBIDDEN, reason)); + } match state.result_store.execute_and_store(&state.engine, &req.sql).await { Ok(handle) => Ok(Json(handle)), Err(e) => Err((StatusCode::BAD_REQUEST, e)), @@ -211,3 +260,65 @@ async fn compact_dataset( Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } + +#[cfg(test)] +mod sql_policy_tests { + use super::*; + use truth::sql_query_guard_store; + + // These tests exercise the policy gate without spinning up a DataFusion + // engine — they only need `TruthStore`. Purpose: prove the P42-002 + // enforcement point actually rejects destructive SQL. This is the + // regression guard for the queryd/service.rs finding that looped + // across scrum iters 3-5. + + #[test] + fn blocks_drop_table() { + let store = sql_query_guard_store(); + let reason = sql_policy_check(&store, "DROP TABLE users").expect("must reject"); + assert!(reason.contains("destructive"), "reason: {reason}"); + } + + #[test] + fn blocks_delete_from() { + let store = sql_query_guard_store(); + assert!(sql_policy_check(&store, "delete from t where 1=1").is_some()); + } + + #[test] + fn blocks_truncate() { + let store = sql_query_guard_store(); + assert!(sql_policy_check(&store, "TRUNCATE workers").is_some()); + } + + #[test] + fn blocks_empty_sql() { + let store = sql_query_guard_store(); + assert!(sql_policy_check(&store, "").is_some()); + } + + #[test] + fn allows_benign_select() { + let store = sql_query_guard_store(); + assert!(sql_policy_check(&store, "SELECT count(*) FROM workers").is_none()); + } + + #[test] + fn allows_select_with_deleted_word_in_column() { + // Substring match is narrow ("delete from", not "delete"), so a + // column named `deleted_at` doesn't trip the guard. Important + // check — false positives on benign queries would make the gate + // unusable in practice. + let store = sql_query_guard_store(); + assert!( + sql_policy_check(&store, "SELECT deleted_at FROM t").is_none(), + "column names containing 'delete' must not be rejected" + ); + } + + #[test] + fn case_insensitive_match_catches_mixed_case() { + let store = sql_query_guard_store(); + assert!(sql_policy_check(&store, "Drop Table X").is_some()); + } +} diff --git a/crates/truth/src/lib.rs b/crates/truth/src/lib.rs index 0fd57c3..930f59d 100644 --- a/crates/truth/src/lib.rs +++ b/crates/truth/src/lib.rs @@ -18,6 +18,11 @@ pub enum RuleCondition { FieldMismatch { field: String, value: String }, FieldEmpty { field: String }, FieldGreater { field: String, threshold: i64 }, + // Case-insensitive substring scan — true if the field value contains + // ANY of `needles`. Added for SQL/command guards where rules of the + // form "sql must not contain DROP/DELETE/TRUNCATE" need to express + // enforcement as a passing precondition being absent. + FieldContainsAny { field: String, needles: Vec }, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -124,6 +129,15 @@ fn evaluate_condition(cond: &RuleCondition, ctx: &serde_json::Value) -> bool { .map(|n| n > *threshold) .unwrap_or(false) } + RuleCondition::FieldContainsAny { field, needles } => { + match field_as_string(ctx, field) { + None => false, + Some(s) => { + let haystack = s.to_ascii_lowercase(); + needles.iter().any(|n| haystack.contains(&n.to_ascii_lowercase())) + } + } + } } } @@ -147,6 +161,49 @@ fn field_as_string(ctx: &serde_json::Value, path: &str) -> Option { }) } +/// Minimal SQL guard — rejects destructive verbs (DROP/TRUNCATE/DELETE). +/// queryd/src/service.rs loads this into its `QueryState` and evaluates +/// every `/sql` request against it before hitting the DataFusion engine. +/// This is the P42-002 enforcement point flagged across scrum iters 3-5 +/// ("raw SQL forwarded without schema or policy gate"). +/// +/// Intentionally narrow: it's a safety net, not a full SQL parser. If +/// callers need richer AST-aware enforcement they should extend this with +/// structured rules rather than new needles. +pub fn sql_query_guard_store() -> TruthStore { + let mut store = TruthStore::new(); + store.add_rule(TruthRule { + id: "no-destructive-sql".to_string(), + task_class: "sql_query".to_string(), + description: "SQL must not contain destructive verbs".to_string(), + condition: RuleCondition::FieldContainsAny { + field: "sql".to_string(), + needles: vec![ + "drop table".to_string(), + "drop schema".to_string(), + "drop database".to_string(), + "truncate".to_string(), + "delete from".to_string(), + ], + }, + action: RuleAction::Reject { + message: "destructive SQL rejected by truth.sql_query_guard".to_string(), + }, + }); + store.add_rule(TruthRule { + id: "sql-not-empty".to_string(), + task_class: "sql_query".to_string(), + description: "SQL must not be empty".to_string(), + condition: RuleCondition::FieldEmpty { + field: "sql".to_string(), + }, + action: RuleAction::Reject { + message: "empty SQL rejected".to_string(), + }, + }); + store +} + pub fn default_truth_store() -> TruthStore { let mut store = TruthStore::new(); @@ -520,4 +577,67 @@ mod tests { let actions = s.check("t"); assert_eq!(actions.len(), 3, "check returns one action per rule regardless of condition"); } + + fn sql_guard_store() -> TruthStore { + let mut s = TruthStore::new(); + s.add_rule(TruthRule { + id: "no-destructive".into(), + task_class: "sql_query".into(), + description: "SQL must not contain destructive verbs".into(), + condition: RuleCondition::FieldContainsAny { + field: "sql".into(), + needles: vec![ + "drop table".into(), + "drop schema".into(), + "truncate".into(), + "delete from".into(), + ], + }, + action: RuleAction::Reject { + message: "destructive SQL rejected".into(), + }, + }); + s + } + + #[test] + fn field_contains_any_matches_case_insensitively() { + let s = sql_guard_store(); + let ctx = serde_json::json!({"sql": "SELECT * FROM t; DROP TABLE users;"}); + let o = s.evaluate("sql_query", &ctx); + assert!(o[0].passed, "condition holds when needle present (case-insensitive)"); + } + + #[test] + fn field_contains_any_is_false_when_no_needle_matches() { + let s = sql_guard_store(); + let ctx = serde_json::json!({"sql": "SELECT count(*) FROM workers"}); + let o = s.evaluate("sql_query", &ctx); + assert!(!o[0].passed, "benign SELECT should not match destructive needles"); + } + + #[test] + fn field_contains_any_false_when_field_missing() { + let s = sql_guard_store(); + let ctx = serde_json::json!({}); + let o = s.evaluate("sql_query", &ctx); + assert!(!o[0].passed, "missing field → condition cannot hold"); + } + + #[test] + fn field_contains_any_empty_needles_list_never_matches() { + let mut s = TruthStore::new(); + s.add_rule(TruthRule { + id: "empty".into(), + task_class: "x".into(), + description: "".into(), + condition: RuleCondition::FieldContainsAny { + field: "sql".into(), + needles: vec![], + }, + action: RuleAction::Pass, + }); + let o = s.evaluate("x", &serde_json::json!({"sql": "anything"})); + assert!(!o[0].passed, "no needles → any:: is false"); + } } \ No newline at end of file