From ebd9ab7c77520c35394ebb5516c8cc9cee123822 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 27 Apr 2026 07:36:40 -0500 Subject: [PATCH] =?UTF-8?q?validator:=20Phase=2043=20v3=20=E2=80=94=20prod?= =?UTF-8?q?uction=20WorkerLookup=20backed=20by=20workers=5F500k.parquet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the Phase 43 v2 loose end. The validator scaffolds (FillValidator, EmailValidator) take Arc at construction; this commit ships the parquet-snapshot impl that production code wires in. Schema mapping (workers_500k.parquet → WorkerRecord): worker_id (int64) → candidate_id = "W-{id}" (matches what the staffing executor emits) name (string) → name (already concatenated upstream) role (string) → role city, state (string) → city, state availability (double) → status: "active" if >0 else "inactive" Workers_500k has no `status` column; we derive from `availability` since 0.0 means vacationing/suspended/etc in this dataset's convention. Once Track A.B's `_safe` view ships with proper status, flip the loader to read it directly — schema mapping is in one function (load_workers_parquet), so the swap is trivial. In-memory snapshot model: - Loads all 500K rows at startup → ~75MB resident - Sync .find() — no per-call I/O on the validation hot path - Refresh = call load_workers_parquet again to rebuild - Caller-driven refresh (no auto-watch) — operators pick the cadence Why workers_500k and not candidates.parquet: candidates.parquet has the right shape (string candidate_id, status, first/last_name) but lacks `role` — and the staffing executor matches the W-* convention from workers_500k_v8 corpus. So the production data path goes through workers_500k. The schema mismatch between the two parquets is documented in `reports/staffing/synthetic-data-gap- report.md` (gap A); resolution is operator's call. Errors are typed (LookupLoadError): - Open: file not found / permission - Parse: invalid parquet - MissingColumn: schema doesn't have required field - BadRow: row missing worker_id or name Schema check happens before iteration, so a wrong-shape file fails loud immediately rather than silently building an empty lookup. Verification: cargo build -p validator compiles cargo test -p validator 33 pass / 0 fail (was 31; +2 for parquet) load_real_workers_500k smoke test passes against the live 500K-row file: W-1 resolves, status + role + city/state all populated. Phase 43 v3 part 2 (next): - /v1/validate gateway endpoint that takes a JSON artifact + dispatches to FillValidator/EmailValidator/PlaybookValidator with a shared WorkerLookup loaded from the parquet at gateway startup. - That closes the "any caller can validate" surface; execution-loop wiring (Phase 43 PRD's "generate → validate → correct → retry") becomes a thin wrapper on top of /v1/validate. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/validator/Cargo.toml | 4 + crates/validator/src/staffing/mod.rs | 1 + .../validator/src/staffing/parquet_lookup.rs | 165 ++++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 crates/validator/src/staffing/parquet_lookup.rs diff --git a/crates/validator/Cargo.toml b/crates/validator/Cargo.toml index 7b7f585..b135bba 100644 --- a/crates/validator/Cargo.toml +++ b/crates/validator/Cargo.toml @@ -9,3 +9,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +# Parquet loader for ParquetWorkerLookup (Phase 43 v3 — production +# WorkerLookup backed by workers_500k.parquet snapshot). +arrow = { workspace = true } +parquet = { workspace = true } diff --git a/crates/validator/src/staffing/mod.rs b/crates/validator/src/staffing/mod.rs index c959cd2..dbf33ac 100644 --- a/crates/validator/src/staffing/mod.rs +++ b/crates/validator/src/staffing/mod.rs @@ -6,3 +6,4 @@ pub mod fill; pub mod email; pub mod playbook; +pub mod parquet_lookup; diff --git a/crates/validator/src/staffing/parquet_lookup.rs b/crates/validator/src/staffing/parquet_lookup.rs new file mode 100644 index 0000000..0009b98 --- /dev/null +++ b/crates/validator/src/staffing/parquet_lookup.rs @@ -0,0 +1,165 @@ +//! Production WorkerLookup backed by a workers_500k.parquet snapshot. +//! +//! Loads the full roster into memory at startup (one-shot). 500K rows +//! at ~150 bytes per WorkerRecord ≈ 75 MB resident — fine for any +//! production lakehouse process. Refresh is intentionally +//! caller-driven (call `from_parquet` again to rebuild) rather than +//! automatic — operators decide when staffing data has changed enough +//! to justify the few-second reload. +//! +//! Schema mapping (workers_500k.parquet → WorkerRecord): +//! worker_id (int64) → candidate_id = "W-{id}" +//! name (string) → name +//! role (string) → role +//! city (string) → city +//! state (string) → state +//! availability (double) → status: "active" if >0 else "inactive" +//! +//! No status column on workers_500k, so we derive from availability — +//! the floor convention used elsewhere in the lakehouse staffing +//! pipeline. Workers with availability=0.0 are treated as inactive +//! (vacation, suspended, etc.). Once the Track-A.B `_safe` view ships +//! with proper `status`, switch this loader to read it directly. +//! +//! Blacklist join is not done here — caller is expected to populate +//! `blacklisted_clients` from a separate source (Phase 43 PRD says +//! `client_blacklist` table; not yet defined). Default empty. + +use crate::{InMemoryWorkerLookup, WorkerLookup, WorkerRecord}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::record::Field; +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +#[derive(Debug, thiserror::Error)] +pub enum LookupLoadError { + #[error("opening parquet at {path}: {source}")] + Open { path: String, #[source] source: std::io::Error }, + #[error("parsing parquet at {path}: {source}")] + Parse { path: String, #[source] source: parquet::errors::ParquetError }, + #[error("missing required column {column}")] + MissingColumn { column: String }, + #[error("row {row}: {reason}")] + BadRow { row: usize, reason: String }, +} + +/// Build an `InMemoryWorkerLookup` from a workers_500k-shaped parquet +/// file. Returned as `Arc` to drop into validator +/// constructors. +pub fn load_workers_parquet(path: &Path) -> Result, LookupLoadError> { + let file = File::open(path).map_err(|e| LookupLoadError::Open { + path: path.display().to_string(), + source: e, + })?; + let reader = SerializedFileReader::new(file).map_err(|e| LookupLoadError::Parse { + path: path.display().to_string(), + source: e, + })?; + + // Validate schema covers what we need before iterating rows. + let schema = reader.metadata().file_metadata().schema(); + let column_names: Vec<&str> = schema.get_fields().iter().map(|f| f.name()).collect(); + for required in &["worker_id", "name", "role", "city", "state", "availability"] { + if !column_names.contains(required) { + return Err(LookupLoadError::MissingColumn { column: (*required).to_string() }); + } + } + + let row_iter = reader.get_row_iter(None).map_err(|e| LookupLoadError::Parse { + path: path.display().to_string(), + source: e, + })?; + + let mut records: Vec = Vec::with_capacity(reader.metadata().file_metadata().num_rows() as usize); + let mut row_idx = 0usize; + for row_result in row_iter { + let row = row_result.map_err(|e| LookupLoadError::Parse { + path: path.display().to_string(), + source: e, + })?; + let mut worker_id: Option = None; + let mut name: Option = None; + let mut role: Option = None; + let mut city: Option = None; + let mut state: Option = None; + let mut availability: f64 = 0.0; + for (col_name, field) in row.get_column_iter() { + match (col_name.as_str(), field) { + ("worker_id", Field::Long(v)) => worker_id = Some(*v), + ("worker_id", Field::Int(v)) => worker_id = Some(*v as i64), + ("name", Field::Str(v)) => name = Some(v.clone()), + ("role", Field::Str(v)) => role = Some(v.clone()), + ("city", Field::Str(v)) => city = Some(v.clone()), + ("state", Field::Str(v)) => state = Some(v.clone()), + ("availability", Field::Double(v)) => availability = *v, + ("availability", Field::Float(v)) => availability = *v as f64, + _ => { /* extra columns ignored */ } + } + } + let id = worker_id.ok_or_else(|| LookupLoadError::BadRow { + row: row_idx, + reason: "worker_id missing or non-integer".into(), + })?; + let nm = name.ok_or_else(|| LookupLoadError::BadRow { + row: row_idx, + reason: "name missing".into(), + })?; + records.push(WorkerRecord { + candidate_id: format!("W-{id}"), + name: nm, + // status derived from availability (workers_500k has no + // status column). 0.0 → inactive, >0.0 → active. + status: if availability > 0.0 { "active".into() } else { "inactive".into() }, + city, + state, + role, + blacklisted_clients: vec![], + }); + row_idx += 1; + } + + tracing::info!( + target: "validator.parquet_lookup", + rows = records.len(), + path = %path.display(), + "loaded workers parquet snapshot" + ); + + Ok(Arc::new(InMemoryWorkerLookup::from_records(records))) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + /// Smoke test against the live workers_500k.parquet on disk. + /// Skipped automatically if the file isn't present (CI / sparse + /// checkouts) so the test suite stays portable. + #[test] + fn load_real_workers_500k() { + let path = PathBuf::from("/home/profit/lakehouse/data/datasets/workers_500k.parquet"); + if !path.exists() { + eprintln!("skip: {} not present", path.display()); + return; + } + let lookup = load_workers_parquet(&path).expect("load"); + // Basic shape: at least one worker resolves and has the + // expected fields populated. + let probe = lookup.find("W-1"); + assert!(probe.is_some(), "W-1 should exist in 500K-row parquet"); + let w = probe.unwrap(); + assert!(!w.name.is_empty(), "name should be populated"); + assert!(w.status == "active" || w.status == "inactive"); + assert!(w.role.is_some()); + assert!(w.city.is_some()); + assert!(w.state.is_some()); + } + + #[test] + fn missing_file_returns_error() { + let r = load_workers_parquet(Path::new("/nonexistent.parquet")); + assert!(matches!(r, Err(LookupLoadError::Open { .. }))); + } +}