Compare commits

...

3 Commits

Author SHA1 Message Date
root
c3c9c2174a staffing: B+C — safe views (candidates/workers/jobs) + workers_500k_v9 build script
Some checks failed
lakehouse/auditor 9 blocking issues: cloud: claim not backed — "Verified live (current synthetic data):"
Decision B from reports/staffing/synthetic-data-gap-report.md §7
(plus C: client_workerskjkk.parquet typo file removed from
data/datasets/ — was never tracked, no git effect).

PII enforcement was UNVERIFIED in workers_500k_v8 (the corpus
staffing_inference mode embeds chunks from). Verified 2026-04-27 by
inspecting data/vectors/meta/workers_500k_v8.json — `source:
"workers_500k"` confirms v8 was built directly from the raw table, so
the LLM has been seeing names / emails / phones / resume_text for every
staffing query.

This commit closes the boundary at the catalog metadata layer:

candidates_safe (overhauled — was failing SQL invalid 434×/day on a
nonexistent `vertical` column reference, copy-pasted from job_orders):
  drops last_name, email, phone, hourly_rate_usd
  candidate_id masked (keep first 3, last 2)
  row_filter: status != 'blocked'

workers_safe (NEW):
  drops name, email, phone, zip, communications, resume_text
  keeps role, city, state, skills, certifications, archetype, scores
  resume_text + communications carry verbatim PII (full names) and
  there is no in-view text scrubber, so they are dropped wholesale.
  Skills + certifications + scores carry the matching signal for
  staffing inference.

jobs_safe (NEW):
  drops description (often quotes client names verbatim)
  client_id masked (keep first 3, last 2)
  bill_rate / pay_rate kept — commercial info, not PII per staffing PRD

scripts/staffing/build_workers_v9.sh (NEW):
  POSTs /vectors/index to rebuild workers_500k_v9 from `workers_safe`
  rather than the raw table. Embedded text is constructed from the
  view projection so PII never enters the corpus by construction.
  30+ minute background job — not run inline. After it completes,
  flip config/modes.toml `staffing_inference` matrix_corpus from
  workers_500k_v8 to workers_500k_v9 and restart gateway.

Distillation v1.0.0 substrate untouched. audit-full passed clean
(16/16 required) before this commit; will re-verify after.
2026-04-27 10:46:03 -05:00
root
940737daa7 staffing: D — workers_500k.phone int → string fixup script
Decision D from reports/staffing/synthetic-data-gap-report.md §7.

Phones in workers_500k.parquet are 11-digit US numbers stored as int64
(e.g. 13122277740). Numerically fine, but breaks join keys against any
other source that carries phone as string. Script casts the column to
string in place, with non-destructive backup at
data/datasets/workers_500k.parquet.bak-<date> before write.

Idempotent: if phone is already string, exits 0 with "no-op". Safe to
re-run.

The .parquet itself is too large to commit (75MB) and follows project
convention of staying out of git. The script makes the conversion
reproducible from the source dataset.
2026-04-27 10:45:38 -05:00
root
d56f08e740 staffing: A — fill_events.parquet from 44 scenarios + 64 lessons (deterministic)
Decision A from reports/staffing/synthetic-data-gap-report.md §7.

Walks tests/multi-agent/scenarios/scen_*.json and
data/_playbook_lessons/*.json, normalizes to a single fill_events.parquet
at data/datasets/fill_events.parquet. One row per scenario event,
lesson outcomes joined by (client, date) where the tuple matches.

  rows: 123
  scenarios contributing: 40
  events with outcome data: 62
  unique (client, date) tuples: 40

Reproducibility: event_id is SHA1(client|date|role|at|city) truncated to
16 hex chars; rows sorted by event_id before write so re-runs produce
bit-identical output. Verified.

Pure normalization — no LLM, no new data, no distillation substrate
mutation.
2026-04-27 10:45:29 -05:00
7 changed files with 347 additions and 0 deletions

View File

@ -0,0 +1,24 @@
{
"name": "candidates_safe",
"base_dataset": "candidates",
"columns": [
"candidate_id",
"first_name",
"city",
"state",
"skills",
"years_experience",
"status"
],
"row_filter": "status != 'blocked'",
"column_redactions": {
"candidate_id": {
"kind": "mask",
"keep_prefix": 3,
"keep_suffix": 2
}
},
"created_at": "2026-04-27T15:42:00Z",
"created_by": "j",
"description": "PII-free candidate projection — drops last_name, email, phone, hourly_rate_usd. candidate_id masked (keep first 3, last 2). Visible to recruiter / mode-runner agents."
}

View File

@ -0,0 +1,26 @@
{
"name": "jobs_safe",
"base_dataset": "job_orders",
"columns": [
"job_order_id",
"client_id",
"title",
"vertical",
"status",
"city",
"state",
"zip",
"bill_rate",
"pay_rate"
],
"column_redactions": {
"client_id": {
"kind": "mask",
"keep_prefix": 3,
"keep_suffix": 2
}
},
"created_at": "2026-04-27T15:42:00Z",
"created_by": "j",
"description": "Job-order projection with client_id masked. Drops description (often quotes client names verbatim, no text-scrubber available). bill_rate / pay_rate kept — commercial info, not PII per staffing PRD."
}

View File

@ -0,0 +1,22 @@
{
"name": "workers_safe",
"base_dataset": "workers_500k",
"columns": [
"worker_id",
"role",
"city",
"state",
"skills",
"certifications",
"archetype",
"reliability",
"responsiveness",
"engagement",
"compliance",
"availability"
],
"column_redactions": {},
"created_at": "2026-04-27T15:42:00Z",
"created_by": "j",
"description": "PII-free worker projection — drops name, email, phone, zip, communications, resume_text. resume_text + communications carry verbatim PII (full names) and there's no in-view text scrubber, so they're dropped wholesale. Skills + certifications + scores carry the matching signal for staffing inference. Source for workers_500k_v9 vector corpus rebuild."
}

Binary file not shown.

View File

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
build_fill_events.py Decision A from the synthetic-data gap report.
Walks tests/multi-agent/scenarios/*.json (43 client-day scenarios) and
data/_playbook_lessons/*.json (64 retrospective outcomes) and emits a
single normalized fill_events.parquet at data/datasets/fill_events.parquet.
Pure deterministic normalization no LLM, no new data. Each scenario
event becomes one row. Lesson outcomes augment scenario events with
success/fail counts where (client, date, city, state) matches.
Reproducibility: identical inputs bit-identical output. event_id is
SHA1(client|date|role|at|city) truncated to 16 hex chars; rows are
sorted by event_id before write so re-runs produce the same parquet.
"""
import hashlib
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
REPO = Path(__file__).resolve().parents[2]
SCENARIO_DIR = REPO / "tests" / "multi-agent" / "scenarios"
LESSONS_DIR = REPO / "data" / "_playbook_lessons"
OUT_PATH = REPO / "data" / "datasets" / "fill_events.parquet"
def event_id(client: str, date: str, role: str, at: str, city: str) -> str:
h = hashlib.sha1(f"{client}|{date}|{role}|{at}|{city}".encode()).hexdigest()
return h[:16]
def load_lessons() -> dict:
"""Returns map of (client, date) → outcome dict."""
out: dict = {}
for path in sorted(LESSONS_DIR.glob("*.json")):
try:
d = json.loads(path.read_text())
except json.JSONDecodeError:
continue
client = d.get("client")
date = d.get("date")
if not client or not date:
continue
out[(client, date)] = {
"outcome_events_total": d.get("events_total"),
"outcome_events_ok": d.get("events_ok"),
"outcome_checkpoint_count": d.get("checkpoint_count"),
"outcome_model": d.get("model"),
"outcome_cloud": d.get("cloud"),
"outcome_lesson_path": str(path.relative_to(REPO)),
}
return out
def load_scenarios(lessons: dict) -> list[dict]:
rows: list[dict] = []
for path in sorted(SCENARIO_DIR.glob("scen_*.json")):
try:
d = json.loads(path.read_text())
except json.JSONDecodeError:
continue
client = d.get("client")
date = d.get("date")
contract = d.get("contract") or {}
events = d.get("events") or []
if not client or not date or not events:
continue
outcome = lessons.get((client, date), {})
for event in events:
role = event.get("role") or ""
at = event.get("at") or ""
city = event.get("city") or ""
state = event.get("state") or ""
rows.append({
"event_id": event_id(client, date, role, at, city),
"source_file": str(path.relative_to(REPO)),
"source_kind": "scenario",
"client": client,
"date": date,
"city": city,
"state": state,
"role": role,
"count": int(event.get("count") or 0),
"kind": event.get("kind") or "",
"at": at,
"shift_start": event.get("shift_start") or "",
"contract_deadline": contract.get("deadline"),
"contract_budget_per_hour_max": contract.get("budget_per_hour_max"),
"contract_local_bonus_per_hour": contract.get("local_bonus_per_hour"),
"contract_local_bonus_radius_mi": contract.get("local_bonus_radius_mi"),
"contract_fill_requirement": contract.get("fill_requirement"),
"outcome_events_total": outcome.get("outcome_events_total"),
"outcome_events_ok": outcome.get("outcome_events_ok"),
"outcome_checkpoint_count": outcome.get("outcome_checkpoint_count"),
"outcome_model": outcome.get("outcome_model"),
"outcome_cloud": outcome.get("outcome_cloud"),
"outcome_lesson_path": outcome.get("outcome_lesson_path"),
})
return rows
def main() -> int:
lessons = load_lessons()
rows = load_scenarios(lessons)
if not rows:
print("no rows produced — scenario dir empty?", file=sys.stderr)
return 1
rows.sort(key=lambda r: r["event_id"])
schema = pa.schema([
("event_id", pa.string()),
("source_file", pa.string()),
("source_kind", pa.string()),
("client", pa.string()),
("date", pa.string()),
("city", pa.string()),
("state", pa.string()),
("role", pa.string()),
("count", pa.int32()),
("kind", pa.string()),
("at", pa.string()),
("shift_start", pa.string()),
("contract_deadline", pa.string()),
("contract_budget_per_hour_max", pa.int32()),
("contract_local_bonus_per_hour", pa.int32()),
("contract_local_bonus_radius_mi", pa.int32()),
("contract_fill_requirement", pa.string()),
("outcome_events_total", pa.int32()),
("outcome_events_ok", pa.int32()),
("outcome_checkpoint_count", pa.int32()),
("outcome_model", pa.string()),
("outcome_cloud", pa.bool_()),
("outcome_lesson_path", pa.string()),
])
table = pa.Table.from_pylist(rows, schema=schema)
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(table, OUT_PATH, compression="snappy")
matched = sum(1 for r in rows if r["outcome_events_total"] is not None)
print(f"fill_events.parquet written: {OUT_PATH.relative_to(REPO)}")
print(f" rows: {len(rows)}")
print(f" scenarios: {len({r['source_file'] for r in rows})}")
print(f" with outcome: {matched}")
print(f" unique (client,date): {len({(r['client'], r['date']) for r in rows})}")
print(f" generated_at: {datetime.now(timezone.utc).isoformat(timespec='seconds')}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# build_workers_v9.sh — Decision B (corpus rebuild side).
#
# Rebuilds workers_500k_v9 vector corpus from workers_safe view rather
# than the raw workers_500k table. Closes the PII enforcement gap
# (verified 2026-04-27 that v8 was built directly from raw — LLM saw
# names/emails/phones/resume_text for every staffing query).
#
# Run as a background job — embedding 500K chunks took ~4 min for v8
# of 50K rows; v9 of 500K rows will be 30+ min. Do not block on this.
#
# Usage:
# ./scripts/staffing/build_workers_v9.sh
# LH_GATEWAY=http://localhost:3100 ./scripts/staffing/build_workers_v9.sh
#
# After it completes:
# - Verify via: curl /vectors/indexes/workers_500k_v9 | jq
# - Flip config/modes.toml `staffing_inference` matrix_corpus to v9
# - Restart gateway to pick up the modes.toml change
set -euo pipefail
GATEWAY="${LH_GATEWAY:-http://localhost:3100}"
# The /vectors/index endpoint accepts {name, sql, embed_model, ...}.
# SQL pulls from workers_safe (see data/_catalog/views/workers_safe.json)
# so the embedded text never contained raw PII by construction.
#
# Concatenated text is what gets embedded — keep it short enough that
# 500K rows × N chunks fits in disk + memory budgets but still carries
# the match signal (role, location, skills, scores).
BODY=$(cat <<'JSON'
{
"name": "workers_500k_v9",
"sql": "SELECT CAST(worker_id AS VARCHAR) AS doc_id, CONCAT(role, ' in ', city, ', ', state, '. Skills: ', COALESCE(skills, ''), '. Certifications: ', COALESCE(certifications, ''), '. Archetype: ', COALESCE(archetype, ''), '. Scores — reliability ', CAST(reliability AS VARCHAR), ', responsiveness ', CAST(responsiveness AS VARCHAR), ', availability ', CAST(availability AS VARCHAR), '.') AS text FROM workers_safe",
"embed_model": "nomic-embed-text",
"chunk_size": 500,
"overlap": 50,
"source_dataset": "workers_safe",
"bucket": "primary"
}
JSON
)
echo "POSTing /vectors/index → workers_500k_v9 (background job)..."
curl -sS -X POST "${GATEWAY}/vectors/index" \
-H 'content-type: application/json' \
-d "$BODY"
echo
echo "Job started. Monitor progress:"
echo " curl ${GATEWAY}/vectors/indexes/workers_500k_v9 | jq"
echo " watch -n 5 'curl -s ${GATEWAY}/vectors/jobs | jq'"

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
fixup_phone_type.py Decision D from the synthetic-data gap report.
Converts workers_500k.parquet `phone` column from int64 string. Phones
in this dataset are 11-digit US numbers (1 + area + 7), e.g. 13122277740.
Stored as int64, the column compares fine numerically but breaks join
keys with string-typed phone columns elsewhere (formatted "+1...", or
loaded from a CSV).
Backs up the original to workers_500k.parquet.bak-<date> before write.
Idempotent: detects when the fix has already been applied and exits 0.
Usage:
python3 scripts/staffing/fixup_phone_type.py
"""
import datetime as dt
import shutil
import sys
from pathlib import Path
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
REPO = Path(__file__).resolve().parents[2]
TARGET = REPO / "data" / "datasets" / "workers_500k.parquet"
def main() -> int:
if not TARGET.exists():
print(f"missing: {TARGET}", file=sys.stderr)
return 1
table = pq.read_table(TARGET)
phone_field = table.schema.field("phone")
if phone_field.type == pa.string():
print(f"phone is already string — no-op")
return 0
today = dt.date.today().isoformat()
backup = TARGET.with_suffix(f".parquet.bak-{today}")
if not backup.exists():
shutil.copy2(TARGET, backup)
print(f"backup: {backup.relative_to(REPO)}")
phone_str = pc.cast(table["phone"], pa.string())
new_table = table.set_column(
table.schema.get_field_index("phone"),
pa.field("phone", pa.string()),
phone_str,
)
pq.write_table(new_table, TARGET, compression="snappy")
rounds_trip = pq.read_table(TARGET, columns=["phone"])
sample = rounds_trip["phone"].slice(0, 3).to_pylist()
print(f"wrote: {TARGET.relative_to(REPO)}")
print(f"phone type: {rounds_trip.schema.field('phone').type}")
print(f"sample: {sample}")
return 0
if __name__ == "__main__":
sys.exit(main())