diff --git a/Cargo.lock b/Cargo.lock index 8e46134..9baea2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4093,6 +4093,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "truth", + "validator", "vectord", ] @@ -8912,6 +8913,8 @@ dependencies = [ name = "validator" version = "0.1.0" dependencies = [ + "arrow 55.2.0", + "parquet 55.2.0", "serde", "serde_json", "thiserror 2.0.18", diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index 48b5d60..5eb1654 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -13,6 +13,7 @@ ingestd = { path = "../ingestd" } vectord = { path = "../vectord" } journald = { path = "../journald" } truth = { path = "../truth" } +validator = { path = "../validator" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 9d4072e..acf0e54 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -295,6 +295,30 @@ async fn main() { } k }, + validate_workers: { + // Load workers_500k.parquet snapshot for /v1/validate. + // Path overridable via LH_WORKERS_PARQUET env. Missing + // file is non-fatal — validators run schema/PII checks + // unaffected; only worker-existence checks fail clean. + let path_str = std::env::var("LH_WORKERS_PARQUET") + .unwrap_or_else(|_| "/home/profit/lakehouse/data/datasets/workers_500k.parquet".into()); + let path = std::path::Path::new(&path_str); + if path.exists() { + match validator::staffing::parquet_lookup::load_workers_parquet(path) { + Ok(lookup) => { + tracing::info!("v1: workers parquet loaded from {} — /v1/validate worker-existence checks enabled", path_str); + lookup + } + Err(e) => { + tracing::warn!("v1: workers parquet at {} unreadable ({e}) — /v1/validate worker-existence checks will fail Consistency", path_str); + std::sync::Arc::new(validator::InMemoryWorkerLookup::new()) + } + } + } else { + tracing::warn!("v1: workers parquet at {} not found — /v1/validate worker-existence checks will fail Consistency", path_str); + std::sync::Arc::new(validator::InMemoryWorkerLookup::new()) + } + }, // Phase 40 early deliverable — Langfuse trace emitter. // Defaults match mcp-server/tracing.ts conventions so // gateway traces land in the same staffing project. diff --git a/crates/gateway/src/v1/mod.rs b/crates/gateway/src/v1/mod.rs index dc0076e..9af8d85 100644 --- a/crates/gateway/src/v1/mod.rs +++ b/crates/gateway/src/v1/mod.rs @@ -18,6 +18,7 @@ pub mod gemini; pub mod claude; pub mod kimi; pub mod opencode; +pub mod validate; pub mod langfuse_trace; pub mod mode; pub mod respond; @@ -68,6 +69,15 @@ pub struct V1State { /// `OPENCODE_API_KEY` env or `/etc/lakehouse/opencode.env`. None = /// provider="opencode" calls 503. pub opencode_key: Option, + /// Shared WorkerLookup loaded once at startup from + /// workers_500k.parquet (path: LH_WORKERS_PARQUET env, default + /// data/datasets/workers_500k.parquet). Used by /v1/validate to + /// run FillValidator/EmailValidator with worker-existence checks. + /// Falls back to an empty InMemoryWorkerLookup if the file is + /// missing — validators still run schema/PII checks but every + /// worker-existence check fails (Consistency error), which is + /// the correct behavior when the roster isn't configured. + pub validate_workers: std::sync::Arc, /// Phase 40 early deliverable — Langfuse client. None = tracing /// disabled (keys missing or container unreachable). Traces are /// fire-and-forget: never block the response path. @@ -107,6 +117,7 @@ pub fn router(state: V1State) -> Router { .route("/mode", post(mode::route)) .route("/mode/list", get(mode::list)) .route("/mode/execute", post(mode::execute)) + .route("/validate", post(validate::validate)) .with_state(state) } diff --git a/crates/gateway/src/v1/validate.rs b/crates/gateway/src/v1/validate.rs new file mode 100644 index 0000000..e326704 --- /dev/null +++ b/crates/gateway/src/v1/validate.rs @@ -0,0 +1,82 @@ +//! /v1/validate — gateway-side artifact validation endpoint. +//! +//! Phase 43 v3 part 2: makes the validator crate network-callable. +//! Any caller (scrum loop, test harness, future agent) can POST a +//! generated artifact and get back a Report (success) or +//! ValidationError (failure with structured field/reason). +//! +//! Request shape: +//! POST /v1/validate +//! { +//! "kind": "fill" | "email" | "playbook", +//! "artifact": { ... }, +//! "context": { ... } // optional — folded into artifact._context +//! } +//! +//! Response on success: 200 + Report JSON +//! Response on failure: 422 + ValidationError JSON +//! Response on bad request: 400 + plain-text error +//! +//! The shared WorkerLookup is loaded once at gateway startup from +//! workers_500k.parquet (path configurable via LH_WORKERS_PARQUET +//! env, defaults to data/datasets/workers_500k.parquet). Falls back +//! to an empty InMemoryWorkerLookup if the file is missing — the +//! validators will still run schema/length/PII checks but worker- +//! existence checks will all fail (Consistency error), which is the +//! correct behavior when the roster isn't configured. + +use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use serde::Deserialize; +use validator::{ + Artifact, Validator, ValidationError, + staffing::{ + fill::FillValidator, + email::EmailValidator, + playbook::PlaybookValidator, + }, +}; + +#[derive(Deserialize)] +pub struct ValidateRequest { + /// `"fill" | "email" | "playbook"` — picks which validator runs. + pub kind: String, + /// The artifact JSON (free-form; shape depends on `kind`). + pub artifact: serde_json::Value, + /// Optional context bag — merged into `artifact._context` so the + /// validator can read fields like `target_count`, `city`, + /// `client_id`, `candidate_id` without callers having to embed + /// `_context` in the artifact themselves. + #[serde(default)] + pub context: Option, +} + +pub async fn validate( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + // Merge context into artifact under `_context` so validators can + // pull contract metadata uniformly. + let mut artifact_value = req.artifact; + if let Some(ctx) = req.context { + if let Some(obj) = artifact_value.as_object_mut() { + obj.insert("_context".to_string(), ctx); + } + } + + // Dispatch. + let workers = state.validate_workers.clone(); + let result: Result = match req.kind.as_str() { + "fill" => FillValidator::new(workers).validate(&Artifact::FillProposal(artifact_value)), + "email" => EmailValidator::new(workers).validate(&Artifact::EmailDraft(artifact_value)), + "playbook" => PlaybookValidator.validate(&Artifact::Playbook(artifact_value)), + other => return ( + StatusCode::BAD_REQUEST, + format!("unknown kind '{other}' — expected fill | email | playbook"), + ).into_response(), + }; + + match result { + Ok(report) => (StatusCode::OK, Json(report)).into_response(), + Err(e) => (StatusCode::UNPROCESSABLE_ENTITY, Json(e)).into_response(), + } +}