Complete working pipeline from Fountain script to validated scene JSON: - Schemas (Pydantic): all 7 layers defined upfront - Fountain parser + normalizer (Layer 1) - AI scene extractor with prompt contracts (Layer 2) - Schema validator + scene-specific semantic validator - Structured JSON logging per layer/scene execution - Versioned output writer (never overwrites) - Retry engine with 4-level failure escalation - Stop condition evaluator (per-unit + global halts) - Diff/drift detector for re-run comparison - CLI entry point with --dry-run, --scene, --test, --force - 3 test scripts (dialogue-heavy, action-heavy, nonstandard) - Expected output files for regression testing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
191 lines
6.9 KiB
Python
191 lines
6.9 KiB
Python
"""CLI entry point for the AI Movie Production Pipeline."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
|
|
parser = argparse.ArgumentParser(description="AI Movie Production Pipeline — Phase 1")
|
|
parser.add_argument("--script", type=str, help="Path to .fountain script file")
|
|
parser.add_argument("--project", type=str, help="Project name (determines output directory)")
|
|
parser.add_argument("--model", type=str, default="claude-sonnet-4-20250514", help="Model ID")
|
|
parser.add_argument("--scene", type=int, default=None, help="Process only this scene number")
|
|
parser.add_argument("--dry-run", action="store_true", help="Validate inputs only, no AI calls")
|
|
parser.add_argument("--force", action="store_true", help="Ignore cache, re-run even if unchanged")
|
|
parser.add_argument("--test", action="store_true", help="Run test suite against test_scripts/")
|
|
parser.add_argument("--output-dir", type=str, default="output", help="Base output directory")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.test:
|
|
run_tests(args.model, args.output_dir)
|
|
return
|
|
|
|
if not args.script or not args.project:
|
|
parser.error("--script and --project are required (unless using --test)")
|
|
|
|
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
|
if not api_key and not args.dry_run:
|
|
print("ERROR: ANTHROPIC_API_KEY not set. Set it in .env or environment.")
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(args.script):
|
|
print(f"ERROR: Script file not found: {args.script}")
|
|
sys.exit(1)
|
|
|
|
from src.execution.runner import run_phase1
|
|
|
|
result = run_phase1(
|
|
script_path=args.script,
|
|
project_name=args.project,
|
|
api_key=api_key or "",
|
|
model=args.model,
|
|
output_dir=args.output_dir,
|
|
scene_filter=args.scene,
|
|
dry_run=args.dry_run,
|
|
force=args.force,
|
|
)
|
|
|
|
if not result.success:
|
|
print(f"\nPIPELINE FAILED: {result.stop_reason}")
|
|
sys.exit(1)
|
|
|
|
print("\nPIPELINE COMPLETE")
|
|
sys.exit(0)
|
|
|
|
|
|
def run_tests(model: str, output_dir: str):
|
|
"""Run test suite against all scripts in test_scripts/."""
|
|
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
|
if not api_key:
|
|
print("ERROR: ANTHROPIC_API_KEY required for tests")
|
|
sys.exit(1)
|
|
|
|
test_dir = os.path.join(os.path.dirname(__file__), "test_scripts")
|
|
expected_dir = os.path.join(test_dir, "expected")
|
|
|
|
scripts = [f for f in os.listdir(test_dir) if f.endswith(".fountain")]
|
|
if not scripts:
|
|
print("No test scripts found in test_scripts/")
|
|
sys.exit(1)
|
|
|
|
from src.execution.runner import run_phase1
|
|
|
|
all_passed = True
|
|
|
|
for script_file in sorted(scripts):
|
|
script_path = os.path.join(test_dir, script_file)
|
|
project_name = f"test_{os.path.splitext(script_file)[0]}"
|
|
expected_file = os.path.join(expected_dir, f"{os.path.splitext(script_file)[0]}_scenes.json")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"TEST: {script_file}")
|
|
print(f"{'='*60}")
|
|
|
|
result = run_phase1(
|
|
script_path=script_path,
|
|
project_name=project_name,
|
|
api_key=api_key,
|
|
model=model,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
# Check against expected
|
|
if os.path.exists(expected_file):
|
|
with open(expected_file, "r", encoding="utf-8") as f:
|
|
expected = json.load(f)
|
|
|
|
passed = True
|
|
|
|
# Scene count check (within 20%)
|
|
expected_count = expected.get("expected_scene_count", 0)
|
|
if expected_count > 0:
|
|
deviation = abs(result.total_scenes - expected_count) / expected_count
|
|
if deviation > 0.20:
|
|
print(f" FAIL: Scene count {result.total_scenes} vs expected {expected_count} (deviation {deviation:.0%})")
|
|
passed = False
|
|
else:
|
|
print(f" PASS: Scene count {result.total_scenes} (expected {expected_count})")
|
|
|
|
# Character check — read actual output
|
|
actual_characters = _collect_characters(output_dir, project_name)
|
|
for char in expected.get("expected_characters", []):
|
|
if char.upper() not in {c.upper() for c in actual_characters}:
|
|
print(f" FAIL: Expected character '{char}' not found")
|
|
passed = False
|
|
else:
|
|
print(f" PASS: Character '{char}' found")
|
|
|
|
# Hallucination check
|
|
for char in expected.get("must_not_contain_characters", []):
|
|
if char.upper() in {c.upper() for c in actual_characters}:
|
|
print(f" FAIL: Hallucinated character '{char}' found")
|
|
passed = False
|
|
|
|
# Location check
|
|
actual_locations = _collect_locations(output_dir, project_name)
|
|
for loc in expected.get("expected_locations", []):
|
|
if loc.upper() not in {l.upper() for l in actual_locations}:
|
|
print(f" FAIL: Expected location '{loc}' not found")
|
|
passed = False
|
|
else:
|
|
print(f" PASS: Location '{loc}' found")
|
|
|
|
if passed:
|
|
print(f" RESULT: PASSED")
|
|
else:
|
|
print(f" RESULT: FAILED")
|
|
all_passed = False
|
|
else:
|
|
print(f" No expected output file — skipping regression checks")
|
|
if not result.success:
|
|
all_passed = False
|
|
|
|
print(f"\n{'='*60}")
|
|
if all_passed:
|
|
print("ALL TESTS PASSED")
|
|
sys.exit(0)
|
|
else:
|
|
print("SOME TESTS FAILED")
|
|
sys.exit(1)
|
|
|
|
|
|
def _collect_characters(output_dir: str, project_name: str) -> set[str]:
|
|
"""Collect all character names from L2 scene outputs."""
|
|
l2_dir = os.path.join(output_dir, project_name, "L2")
|
|
characters: set[str] = set()
|
|
if not os.path.exists(l2_dir):
|
|
return characters
|
|
for f in os.listdir(l2_dir):
|
|
if f.startswith("scene_") and f.endswith(".json") and f != "latest.json":
|
|
with open(os.path.join(l2_dir, f), "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
characters.update(data.get("characters_present", []))
|
|
return characters
|
|
|
|
|
|
def _collect_locations(output_dir: str, project_name: str) -> set[str]:
|
|
"""Collect all location names from L2 scene outputs."""
|
|
l2_dir = os.path.join(output_dir, project_name, "L2")
|
|
locations: set[str] = set()
|
|
if not os.path.exists(l2_dir):
|
|
return locations
|
|
for f in os.listdir(l2_dir):
|
|
if f.startswith("scene_") and f.endswith(".json") and f != "latest.json":
|
|
with open(os.path.join(l2_dir, f), "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
loc = data.get("location", "")
|
|
if loc:
|
|
locations.add(loc)
|
|
return locations
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|