#!/usr/bin/env python3 """ fetch_face_pool.py — pull N synthetic headshots from https://thispersondoesnotexist.com/, write to data/headshots/face_NNNN.jpg, optionally tag each with gender via deepface, emit a JSONL manifest. Each fetch is a fresh StyleGAN face — no real people. Deterministic per worker mapping happens at serve time (mcp-server hashes the worker key into the pool); this script just builds the pool. Usage: python3 scripts/staffing/fetch_face_pool.py --count 300 --concurrency 3 python3 scripts/staffing/fetch_face_pool.py --count 50 --no-gender Re-running is idempotent: existing face_NNNN.jpg files are skipped, and the manifest is rewritten from disk state. """ from __future__ import annotations import argparse import hashlib import json import os import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed import urllib.request import urllib.error URL = "https://thispersondoesnotexist.com/" UA = "Lakehouse/1.0 (face-pool fetch · synthetic-only · no real-person tracking)" def fetch_one(idx: int, out_dir: str) -> tuple[int, str, bool, str | None]: """Returns (idx, basename, cached, error).""" fname = f"face_{idx:04d}.jpg" full = os.path.join(out_dir, fname) if os.path.exists(full) and os.path.getsize(full) > 1024: return idx, fname, True, None try: req = urllib.request.Request(URL, headers={"User-Agent": UA}) with urllib.request.urlopen(req, timeout=20) as resp: blob = resp.read() if len(blob) < 1024: return idx, fname, False, f"response too small ({len(blob)} bytes)" with open(full, "wb") as f: f.write(blob) return idx, fname, False, None except urllib.error.URLError as e: return idx, fname, False, f"urlerror: {e}" except Exception as e: return idx, fname, False, f"{type(e).__name__}: {e}" def maybe_tag_gender(records: list[dict], out_dir: str) -> dict[str, int]: """If deepface is installed, label records that don't already have a gender. Returns a count summary; mutates records in place. Preservation contract: never overwrites prior `gender` (or any other tag — race/age/excluded — set by tag_face_pool.py). On deepface import failure, leaves existing tags alone instead of resetting them to None. The previous behavior wiped 952 hand-classified rows when fetch_face_pool was re-run from a Python without deepface installed.""" try: from deepface import DeepFace # type: ignore except Exception as e: print(f" (deepface unavailable: {e}) — leaving existing tags untouched") for r in records: r.setdefault("gender", None) already = sum(1 for r in records if r.get("gender") in ("man", "woman")) return {"preserved_tagged": already, "untagged": len(records) - already} todo = [r for r in records if r.get("gender") not in ("man", "woman")] if not todo: print(" every record already has gender — nothing to tag.") return {"preserved_tagged": len(records)} print(f" tagging gender via deepface ({len(todo)} of {len(records)} records, CPU; ~0.5-1s per face)…") counts: dict[str, int] = {} for i, r in enumerate(todo): full = os.path.join(out_dir, r["file"]) try: ana = DeepFace.analyze( img_path=full, actions=["gender"], enforce_detection=False, silent=True, ) if isinstance(ana, list): ana = ana[0] if ana else {} g_raw = (ana.get("dominant_gender") or "").lower().strip() r["gender"] = ( "man" if g_raw.startswith("man") else "woman" if g_raw.startswith("woman") else None ) except Exception as e: r["gender"] = None r["gender_error"] = f"{type(e).__name__}: {e}" counts[r["gender"] or "unknown"] = counts.get(r["gender"] or "unknown", 0) + 1 if (i + 1) % 25 == 0: print(f" [{i+1}/{len(todo)}] {counts}") return counts def main(): p = argparse.ArgumentParser() p.add_argument("--count", type=int, default=300, help="how many faces to maintain in pool") p.add_argument( "--out", default=os.path.join(os.path.dirname(__file__), "..", "..", "data", "headshots"), ) p.add_argument("--concurrency", type=int, default=3, help="parallel fetches (be polite)") p.add_argument("--no-gender", action="store_true", help="skip deepface gender tagging") p.add_argument("--shrink", action="store_true", help="allow --count to drop manifest entries with id >= count. Default: preserve them.") args = p.parse_args() out = os.path.realpath(args.out) os.makedirs(out, exist_ok=True) # Load any existing manifest into a by-id dict so prior tags # (gender / race / age / excluded) survive the rewrite. Also # naturally dedupes — if the file accidentally has duplicate # lines for the same id (this is how we ended up with a 2497- # row manifest backing a 1000-face pool), the last one wins. manifest = os.path.join(out, "manifest.jsonl") existing: dict[int, dict] = {} if os.path.exists(manifest): dup_count = 0 with open(manifest) as f: for line in f: line = line.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: continue rid = row.get("id") if not isinstance(rid, int): continue if rid in existing: dup_count += 1 existing[rid] = row print(f"Loaded existing manifest: {len(existing)} unique ids ({dup_count} duplicate lines collapsed)") max_existing = max(existing.keys()) if existing else -1 if max_existing >= args.count and not args.shrink: print( f"\nERROR: --count={args.count} would drop {sum(1 for k in existing if k >= args.count)} " f"manifest entries (max existing id = {max_existing}). Pass --shrink to allow.\n", file=sys.stderr, ) sys.exit(2) print(f"Fetching {args.count} faces → {out}") print(f"Source: {URL} (synthetic StyleGAN — no real people)") results: list[dict] = [None] * args.count # type: ignore t0 = time.time() with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as ex: futs = {ex.submit(fetch_one, i, out): i for i in range(args.count)} for done, fut in enumerate(as_completed(futs), 1): idx, fname, cached, err = fut.result() # Start from prior manifest row (preserves gender/race/age/excluded) # and overlay only the fields fetch_one is responsible for. base = dict(existing.get(idx, {})) base.update({ "id": idx, "file": fname, "cached": cached, "error": err, }) results[idx] = base if done % 25 == 0 or done == args.count: ok = sum(1 for r in results if r and not r.get("error")) print(f" [{done}/{args.count}] {ok} ok ({time.time()-t0:.1f}s)") # Drop slots that errored or are still None (shouldn't happen) records = [r for r in results if r and not r.get("error")] print(f"\nPool ready: {len(records)} faces, {sum(1 for r in records if r['cached'])} from cache") preserved_tags = sum(1 for r in records if r.get("gender") in ("man", "woman")) if preserved_tags: print(f"Preserved {preserved_tags} prior gender tags (and any race/age/excluded fields).") if not args.no_gender and records: print("\nGender-tagging pass:") summary = maybe_tag_gender(records, out) print(f" distribution: {summary}") else: for r in records: r.setdefault("gender", None) # If --shrink was NOT used and somehow id >= count rows are still in # `existing` (which can only happen if the early gate was bypassed), # carry them forward so we don't quietly drop them. if not args.shrink: for rid, row in existing.items(): if rid >= args.count and rid not in {r["id"] for r in records}: records.append(row) records.sort(key=lambda r: r.get("id", 0)) # Strip transient flags before persisting for r in records: r.pop("cached", None) r.pop("error", None) # Atomic write — if a re-run is interrupted, manifest stays intact. tmp = manifest + ".tmp" with open(tmp, "w") as f: for r in records: f.write(json.dumps(r) + "\n") os.replace(tmp, manifest) print(f"\nManifest: {manifest} ({len(records)} entries)") # Quick checksum manifest for downstream debugging h = hashlib.sha256() for r in records: h.update(r["file"].encode()) h.update(b"|") h.update((r.get("gender") or "?").encode()) print(f"Pool fingerprint (sha256): {h.hexdigest()[:16]}") if __name__ == "__main__": main()