#!/usr/bin/env python3 """ build_fill_events.py — Decision A from the synthetic-data gap report. Walks tests/multi-agent/scenarios/*.json (43 client-day scenarios) and data/_playbook_lessons/*.json (64 retrospective outcomes) and emits a single normalized fill_events.parquet at data/datasets/fill_events.parquet. Pure deterministic normalization — no LLM, no new data. Each scenario event becomes one row. Lesson outcomes augment scenario events with success/fail counts where (client, date, city, state) matches. Reproducibility: identical inputs → bit-identical output. event_id is SHA1(client|date|role|at|city) truncated to 16 hex chars; rows are sorted by event_id before write so re-runs produce the same parquet. """ import hashlib import json import sys from datetime import datetime, timezone from pathlib import Path import pyarrow as pa import pyarrow.parquet as pq REPO = Path(__file__).resolve().parents[2] SCENARIO_DIR = REPO / "tests" / "multi-agent" / "scenarios" LESSONS_DIR = REPO / "data" / "_playbook_lessons" OUT_PATH = REPO / "data" / "datasets" / "fill_events.parquet" def event_id(client: str, date: str, role: str, at: str, city: str) -> str: h = hashlib.sha1(f"{client}|{date}|{role}|{at}|{city}".encode()).hexdigest() return h[:16] def load_lessons() -> dict: """Returns map of (client, date) → outcome dict.""" out: dict = {} for path in sorted(LESSONS_DIR.glob("*.json")): try: d = json.loads(path.read_text()) except json.JSONDecodeError: continue client = d.get("client") date = d.get("date") if not client or not date: continue out[(client, date)] = { "outcome_events_total": d.get("events_total"), "outcome_events_ok": d.get("events_ok"), "outcome_checkpoint_count": d.get("checkpoint_count"), "outcome_model": d.get("model"), "outcome_cloud": d.get("cloud"), "outcome_lesson_path": str(path.relative_to(REPO)), } return out def load_scenarios(lessons: dict) -> list[dict]: rows: list[dict] = [] for path in sorted(SCENARIO_DIR.glob("scen_*.json")): try: d = json.loads(path.read_text()) except json.JSONDecodeError: continue client = d.get("client") date = d.get("date") contract = d.get("contract") or {} events = d.get("events") or [] if not client or not date or not events: continue outcome = lessons.get((client, date), {}) for event in events: role = event.get("role") or "" at = event.get("at") or "" city = event.get("city") or "" state = event.get("state") or "" rows.append({ "event_id": event_id(client, date, role, at, city), "source_file": str(path.relative_to(REPO)), "source_kind": "scenario", "client": client, "date": date, "city": city, "state": state, "role": role, "count": int(event.get("count") or 0), "kind": event.get("kind") or "", "at": at, "shift_start": event.get("shift_start") or "", "contract_deadline": contract.get("deadline"), "contract_budget_per_hour_max": contract.get("budget_per_hour_max"), "contract_local_bonus_per_hour": contract.get("local_bonus_per_hour"), "contract_local_bonus_radius_mi": contract.get("local_bonus_radius_mi"), "contract_fill_requirement": contract.get("fill_requirement"), "outcome_events_total": outcome.get("outcome_events_total"), "outcome_events_ok": outcome.get("outcome_events_ok"), "outcome_checkpoint_count": outcome.get("outcome_checkpoint_count"), "outcome_model": outcome.get("outcome_model"), "outcome_cloud": outcome.get("outcome_cloud"), "outcome_lesson_path": outcome.get("outcome_lesson_path"), }) return rows def main() -> int: lessons = load_lessons() rows = load_scenarios(lessons) if not rows: print("no rows produced — scenario dir empty?", file=sys.stderr) return 1 rows.sort(key=lambda r: r["event_id"]) schema = pa.schema([ ("event_id", pa.string()), ("source_file", pa.string()), ("source_kind", pa.string()), ("client", pa.string()), ("date", pa.string()), ("city", pa.string()), ("state", pa.string()), ("role", pa.string()), ("count", pa.int32()), ("kind", pa.string()), ("at", pa.string()), ("shift_start", pa.string()), ("contract_deadline", pa.string()), ("contract_budget_per_hour_max", pa.int32()), ("contract_local_bonus_per_hour", pa.int32()), ("contract_local_bonus_radius_mi", pa.int32()), ("contract_fill_requirement", pa.string()), ("outcome_events_total", pa.int32()), ("outcome_events_ok", pa.int32()), ("outcome_checkpoint_count", pa.int32()), ("outcome_model", pa.string()), ("outcome_cloud", pa.bool_()), ("outcome_lesson_path", pa.string()), ]) table = pa.Table.from_pylist(rows, schema=schema) OUT_PATH.parent.mkdir(parents=True, exist_ok=True) pq.write_table(table, OUT_PATH, compression="snappy") matched = sum(1 for r in rows if r["outcome_events_total"] is not None) print(f"fill_events.parquet written: {OUT_PATH.relative_to(REPO)}") print(f" rows: {len(rows)}") print(f" scenarios: {len({r['source_file'] for r in rows})}") print(f" with outcome: {matched}") print(f" unique (client,date): {len({(r['client'], r['date']) for r in rows})}") print(f" generated_at: {datetime.now(timezone.utc).isoformat(timespec='seconds')}") return 0 if __name__ == "__main__": sys.exit(main())