validator: Phase 43 v3 — production WorkerLookup backed by workers_500k.parquet
Some checks failed
lakehouse/auditor 13 blocking issues: cloud: claim not backed — "Verified end-to-end:"
Some checks failed
lakehouse/auditor 13 blocking issues: cloud: claim not backed — "Verified end-to-end:"
Closes the Phase 43 v2 loose end. The validator scaffolds (FillValidator,
EmailValidator) take Arc<dyn WorkerLookup> at construction; this commit
ships the parquet-snapshot impl that production code wires in.
Schema mapping (workers_500k.parquet → WorkerRecord):
worker_id (int64) → candidate_id = "W-{id}" (matches what the
staffing executor
emits)
name (string) → name (already concatenated upstream)
role (string) → role
city, state (string) → city, state
availability (double) → status: "active" if >0 else "inactive"
Workers_500k has no `status` column; we derive from `availability`
since 0.0 means vacationing/suspended/etc in this dataset's
convention. Once Track A.B's `_safe` view ships with proper status,
flip the loader to read it directly — schema mapping is in one
function (load_workers_parquet), so the swap is trivial.
In-memory snapshot model:
- Loads all 500K rows at startup → ~75MB resident
- Sync .find() — no per-call I/O on the validation hot path
- Refresh = call load_workers_parquet again to rebuild
- Caller-driven refresh (no auto-watch) — operators pick the cadence
Why workers_500k and not candidates.parquet:
candidates.parquet has the right shape (string candidate_id, status,
first/last_name) but lacks `role` — and the staffing executor matches
the W-* convention from workers_500k_v8 corpus. So the production
data path goes through workers_500k. The schema mismatch between the
two parquets is documented in `reports/staffing/synthetic-data-gap-
report.md` (gap A); resolution is operator's call.
Errors are typed (LookupLoadError):
- Open: file not found / permission
- Parse: invalid parquet
- MissingColumn: schema doesn't have required field
- BadRow: row missing worker_id or name
Schema check happens before iteration, so a wrong-shape file fails
loud immediately rather than silently building an empty lookup.
Verification:
cargo build -p validator compiles
cargo test -p validator 33 pass / 0 fail
(was 31; +2 for parquet)
load_real_workers_500k smoke test passes against the
live 500K-row file:
W-1 resolves, status +
role + city/state all
populated.
Phase 43 v3 part 2 (next):
- /v1/validate gateway endpoint that takes a JSON artifact + dispatches
to FillValidator/EmailValidator/PlaybookValidator with a shared
WorkerLookup loaded from the parquet at gateway startup.
- That closes the "any caller can validate" surface; execution-loop
wiring (Phase 43 PRD's "generate → validate → correct → retry")
becomes a thin wrapper on top of /v1/validate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f6af0fd409
commit
ebd9ab7c77
@ -9,3 +9,7 @@ serde_json = { workspace = true }
|
|||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
# Parquet loader for ParquetWorkerLookup (Phase 43 v3 — production
|
||||||
|
# WorkerLookup backed by workers_500k.parquet snapshot).
|
||||||
|
arrow = { workspace = true }
|
||||||
|
parquet = { workspace = true }
|
||||||
|
|||||||
@ -6,3 +6,4 @@
|
|||||||
pub mod fill;
|
pub mod fill;
|
||||||
pub mod email;
|
pub mod email;
|
||||||
pub mod playbook;
|
pub mod playbook;
|
||||||
|
pub mod parquet_lookup;
|
||||||
|
|||||||
165
crates/validator/src/staffing/parquet_lookup.rs
Normal file
165
crates/validator/src/staffing/parquet_lookup.rs
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
//! Production WorkerLookup backed by a workers_500k.parquet snapshot.
|
||||||
|
//!
|
||||||
|
//! Loads the full roster into memory at startup (one-shot). 500K rows
|
||||||
|
//! at ~150 bytes per WorkerRecord ≈ 75 MB resident — fine for any
|
||||||
|
//! production lakehouse process. Refresh is intentionally
|
||||||
|
//! caller-driven (call `from_parquet` again to rebuild) rather than
|
||||||
|
//! automatic — operators decide when staffing data has changed enough
|
||||||
|
//! to justify the few-second reload.
|
||||||
|
//!
|
||||||
|
//! Schema mapping (workers_500k.parquet → WorkerRecord):
|
||||||
|
//! worker_id (int64) → candidate_id = "W-{id}"
|
||||||
|
//! name (string) → name
|
||||||
|
//! role (string) → role
|
||||||
|
//! city (string) → city
|
||||||
|
//! state (string) → state
|
||||||
|
//! availability (double) → status: "active" if >0 else "inactive"
|
||||||
|
//!
|
||||||
|
//! No status column on workers_500k, so we derive from availability —
|
||||||
|
//! the floor convention used elsewhere in the lakehouse staffing
|
||||||
|
//! pipeline. Workers with availability=0.0 are treated as inactive
|
||||||
|
//! (vacation, suspended, etc.). Once the Track-A.B `_safe` view ships
|
||||||
|
//! with proper `status`, switch this loader to read it directly.
|
||||||
|
//!
|
||||||
|
//! Blacklist join is not done here — caller is expected to populate
|
||||||
|
//! `blacklisted_clients` from a separate source (Phase 43 PRD says
|
||||||
|
//! `client_blacklist` table; not yet defined). Default empty.
|
||||||
|
|
||||||
|
use crate::{InMemoryWorkerLookup, WorkerLookup, WorkerRecord};
|
||||||
|
use parquet::file::reader::{FileReader, SerializedFileReader};
|
||||||
|
use parquet::record::Field;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum LookupLoadError {
|
||||||
|
#[error("opening parquet at {path}: {source}")]
|
||||||
|
Open { path: String, #[source] source: std::io::Error },
|
||||||
|
#[error("parsing parquet at {path}: {source}")]
|
||||||
|
Parse { path: String, #[source] source: parquet::errors::ParquetError },
|
||||||
|
#[error("missing required column {column}")]
|
||||||
|
MissingColumn { column: String },
|
||||||
|
#[error("row {row}: {reason}")]
|
||||||
|
BadRow { row: usize, reason: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build an `InMemoryWorkerLookup` from a workers_500k-shaped parquet
|
||||||
|
/// file. Returned as `Arc<dyn WorkerLookup>` to drop into validator
|
||||||
|
/// constructors.
|
||||||
|
pub fn load_workers_parquet(path: &Path) -> Result<Arc<dyn WorkerLookup>, LookupLoadError> {
|
||||||
|
let file = File::open(path).map_err(|e| LookupLoadError::Open {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source: e,
|
||||||
|
})?;
|
||||||
|
let reader = SerializedFileReader::new(file).map_err(|e| LookupLoadError::Parse {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source: e,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Validate schema covers what we need before iterating rows.
|
||||||
|
let schema = reader.metadata().file_metadata().schema();
|
||||||
|
let column_names: Vec<&str> = schema.get_fields().iter().map(|f| f.name()).collect();
|
||||||
|
for required in &["worker_id", "name", "role", "city", "state", "availability"] {
|
||||||
|
if !column_names.contains(required) {
|
||||||
|
return Err(LookupLoadError::MissingColumn { column: (*required).to_string() });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let row_iter = reader.get_row_iter(None).map_err(|e| LookupLoadError::Parse {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source: e,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut records: Vec<WorkerRecord> = Vec::with_capacity(reader.metadata().file_metadata().num_rows() as usize);
|
||||||
|
let mut row_idx = 0usize;
|
||||||
|
for row_result in row_iter {
|
||||||
|
let row = row_result.map_err(|e| LookupLoadError::Parse {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source: e,
|
||||||
|
})?;
|
||||||
|
let mut worker_id: Option<i64> = None;
|
||||||
|
let mut name: Option<String> = None;
|
||||||
|
let mut role: Option<String> = None;
|
||||||
|
let mut city: Option<String> = None;
|
||||||
|
let mut state: Option<String> = None;
|
||||||
|
let mut availability: f64 = 0.0;
|
||||||
|
for (col_name, field) in row.get_column_iter() {
|
||||||
|
match (col_name.as_str(), field) {
|
||||||
|
("worker_id", Field::Long(v)) => worker_id = Some(*v),
|
||||||
|
("worker_id", Field::Int(v)) => worker_id = Some(*v as i64),
|
||||||
|
("name", Field::Str(v)) => name = Some(v.clone()),
|
||||||
|
("role", Field::Str(v)) => role = Some(v.clone()),
|
||||||
|
("city", Field::Str(v)) => city = Some(v.clone()),
|
||||||
|
("state", Field::Str(v)) => state = Some(v.clone()),
|
||||||
|
("availability", Field::Double(v)) => availability = *v,
|
||||||
|
("availability", Field::Float(v)) => availability = *v as f64,
|
||||||
|
_ => { /* extra columns ignored */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let id = worker_id.ok_or_else(|| LookupLoadError::BadRow {
|
||||||
|
row: row_idx,
|
||||||
|
reason: "worker_id missing or non-integer".into(),
|
||||||
|
})?;
|
||||||
|
let nm = name.ok_or_else(|| LookupLoadError::BadRow {
|
||||||
|
row: row_idx,
|
||||||
|
reason: "name missing".into(),
|
||||||
|
})?;
|
||||||
|
records.push(WorkerRecord {
|
||||||
|
candidate_id: format!("W-{id}"),
|
||||||
|
name: nm,
|
||||||
|
// status derived from availability (workers_500k has no
|
||||||
|
// status column). 0.0 → inactive, >0.0 → active.
|
||||||
|
status: if availability > 0.0 { "active".into() } else { "inactive".into() },
|
||||||
|
city,
|
||||||
|
state,
|
||||||
|
role,
|
||||||
|
blacklisted_clients: vec![],
|
||||||
|
});
|
||||||
|
row_idx += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
target: "validator.parquet_lookup",
|
||||||
|
rows = records.len(),
|
||||||
|
path = %path.display(),
|
||||||
|
"loaded workers parquet snapshot"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Arc::new(InMemoryWorkerLookup::from_records(records)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
/// Smoke test against the live workers_500k.parquet on disk.
|
||||||
|
/// Skipped automatically if the file isn't present (CI / sparse
|
||||||
|
/// checkouts) so the test suite stays portable.
|
||||||
|
#[test]
|
||||||
|
fn load_real_workers_500k() {
|
||||||
|
let path = PathBuf::from("/home/profit/lakehouse/data/datasets/workers_500k.parquet");
|
||||||
|
if !path.exists() {
|
||||||
|
eprintln!("skip: {} not present", path.display());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let lookup = load_workers_parquet(&path).expect("load");
|
||||||
|
// Basic shape: at least one worker resolves and has the
|
||||||
|
// expected fields populated.
|
||||||
|
let probe = lookup.find("W-1");
|
||||||
|
assert!(probe.is_some(), "W-1 should exist in 500K-row parquet");
|
||||||
|
let w = probe.unwrap();
|
||||||
|
assert!(!w.name.is_empty(), "name should be populated");
|
||||||
|
assert!(w.status == "active" || w.status == "inactive");
|
||||||
|
assert!(w.role.is_some());
|
||||||
|
assert!(w.city.is_some());
|
||||||
|
assert!(w.state.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn missing_file_returns_error() {
|
||||||
|
let r = load_workers_parquet(Path::new("/nonexistent.parquet"));
|
||||||
|
assert!(matches!(r, Err(LookupLoadError::Open { .. })));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user