diff --git a/data/datasets/fill_events.parquet b/data/datasets/fill_events.parquet new file mode 100644 index 0000000..15cf4ff Binary files /dev/null and b/data/datasets/fill_events.parquet differ diff --git a/scripts/staffing/build_fill_events.py b/scripts/staffing/build_fill_events.py new file mode 100644 index 0000000..91eec14 --- /dev/null +++ b/scripts/staffing/build_fill_events.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +build_fill_events.py — Decision A from the synthetic-data gap report. + +Walks tests/multi-agent/scenarios/*.json (43 client-day scenarios) and +data/_playbook_lessons/*.json (64 retrospective outcomes) and emits a +single normalized fill_events.parquet at data/datasets/fill_events.parquet. + +Pure deterministic normalization — no LLM, no new data. Each scenario +event becomes one row. Lesson outcomes augment scenario events with +success/fail counts where (client, date, city, state) matches. + +Reproducibility: identical inputs → bit-identical output. event_id is +SHA1(client|date|role|at|city) truncated to 16 hex chars; rows are +sorted by event_id before write so re-runs produce the same parquet. +""" + +import hashlib +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq + +REPO = Path(__file__).resolve().parents[2] +SCENARIO_DIR = REPO / "tests" / "multi-agent" / "scenarios" +LESSONS_DIR = REPO / "data" / "_playbook_lessons" +OUT_PATH = REPO / "data" / "datasets" / "fill_events.parquet" + + +def event_id(client: str, date: str, role: str, at: str, city: str) -> str: + h = hashlib.sha1(f"{client}|{date}|{role}|{at}|{city}".encode()).hexdigest() + return h[:16] + + +def load_lessons() -> dict: + """Returns map of (client, date) → outcome dict.""" + out: dict = {} + for path in sorted(LESSONS_DIR.glob("*.json")): + try: + d = json.loads(path.read_text()) + except json.JSONDecodeError: + continue + client = d.get("client") + date = d.get("date") + if not client or not date: + continue + out[(client, date)] = { + "outcome_events_total": d.get("events_total"), + "outcome_events_ok": d.get("events_ok"), + "outcome_checkpoint_count": d.get("checkpoint_count"), + "outcome_model": d.get("model"), + "outcome_cloud": d.get("cloud"), + "outcome_lesson_path": str(path.relative_to(REPO)), + } + return out + + +def load_scenarios(lessons: dict) -> list[dict]: + rows: list[dict] = [] + for path in sorted(SCENARIO_DIR.glob("scen_*.json")): + try: + d = json.loads(path.read_text()) + except json.JSONDecodeError: + continue + client = d.get("client") + date = d.get("date") + contract = d.get("contract") or {} + events = d.get("events") or [] + if not client or not date or not events: + continue + outcome = lessons.get((client, date), {}) + for event in events: + role = event.get("role") or "" + at = event.get("at") or "" + city = event.get("city") or "" + state = event.get("state") or "" + rows.append({ + "event_id": event_id(client, date, role, at, city), + "source_file": str(path.relative_to(REPO)), + "source_kind": "scenario", + "client": client, + "date": date, + "city": city, + "state": state, + "role": role, + "count": int(event.get("count") or 0), + "kind": event.get("kind") or "", + "at": at, + "shift_start": event.get("shift_start") or "", + "contract_deadline": contract.get("deadline"), + "contract_budget_per_hour_max": contract.get("budget_per_hour_max"), + "contract_local_bonus_per_hour": contract.get("local_bonus_per_hour"), + "contract_local_bonus_radius_mi": contract.get("local_bonus_radius_mi"), + "contract_fill_requirement": contract.get("fill_requirement"), + "outcome_events_total": outcome.get("outcome_events_total"), + "outcome_events_ok": outcome.get("outcome_events_ok"), + "outcome_checkpoint_count": outcome.get("outcome_checkpoint_count"), + "outcome_model": outcome.get("outcome_model"), + "outcome_cloud": outcome.get("outcome_cloud"), + "outcome_lesson_path": outcome.get("outcome_lesson_path"), + }) + return rows + + +def main() -> int: + lessons = load_lessons() + rows = load_scenarios(lessons) + if not rows: + print("no rows produced — scenario dir empty?", file=sys.stderr) + return 1 + rows.sort(key=lambda r: r["event_id"]) + + schema = pa.schema([ + ("event_id", pa.string()), + ("source_file", pa.string()), + ("source_kind", pa.string()), + ("client", pa.string()), + ("date", pa.string()), + ("city", pa.string()), + ("state", pa.string()), + ("role", pa.string()), + ("count", pa.int32()), + ("kind", pa.string()), + ("at", pa.string()), + ("shift_start", pa.string()), + ("contract_deadline", pa.string()), + ("contract_budget_per_hour_max", pa.int32()), + ("contract_local_bonus_per_hour", pa.int32()), + ("contract_local_bonus_radius_mi", pa.int32()), + ("contract_fill_requirement", pa.string()), + ("outcome_events_total", pa.int32()), + ("outcome_events_ok", pa.int32()), + ("outcome_checkpoint_count", pa.int32()), + ("outcome_model", pa.string()), + ("outcome_cloud", pa.bool_()), + ("outcome_lesson_path", pa.string()), + ]) + table = pa.Table.from_pylist(rows, schema=schema) + + OUT_PATH.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(table, OUT_PATH, compression="snappy") + + matched = sum(1 for r in rows if r["outcome_events_total"] is not None) + print(f"fill_events.parquet written: {OUT_PATH.relative_to(REPO)}") + print(f" rows: {len(rows)}") + print(f" scenarios: {len({r['source_file'] for r in rows})}") + print(f" with outcome: {matched}") + print(f" unique (client,date): {len({(r['client'], r['date']) for r in rows})}") + print(f" generated_at: {datetime.now(timezone.utc).isoformat(timespec='seconds')}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())