lakehouse/scripts/staffing/fetch_face_pool.py
root 313eec3c6e staffing: face pool fetch preserves prior tags + --shrink gate + atomic manifest write
fetch_face_pool was wiping 952 hand-classified rows when re-run from
a Python without deepface installed (it reset every gender to None).
Now:

- Loads existing manifest by id and overlays only fetch-owned fields,
  so gender/race/age/excluded survive a refetch.
- deepface pass tags only records that don't already have a gender;
  deepface unavailable means "leave existing tags alone" not "reset".
- New --shrink flag required to drop ids >= --count. Default refuses
  to shrink the pool silently.
- Atomic write via tmp + os.replace so an interrupted run can't
  corrupt the manifest.
- Dedupes duplicate id lines (root cause of the 2497-row manifest
  backing a 1000-face pool).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 05:35:19 -05:00

226 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""
fetch_face_pool.py — pull N synthetic headshots from
https://thispersondoesnotexist.com/, write to data/headshots/face_NNNN.jpg,
optionally tag each with gender via deepface, emit a JSONL manifest.
Each fetch is a fresh StyleGAN face — no real people. Deterministic per
worker mapping happens at serve time (mcp-server hashes the worker key
into the pool); this script just builds the pool.
Usage:
python3 scripts/staffing/fetch_face_pool.py --count 300 --concurrency 3
python3 scripts/staffing/fetch_face_pool.py --count 50 --no-gender
Re-running is idempotent: existing face_NNNN.jpg files are skipped, and
the manifest is rewritten from disk state.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import urllib.error
URL = "https://thispersondoesnotexist.com/"
UA = "Lakehouse/1.0 (face-pool fetch · synthetic-only · no real-person tracking)"
def fetch_one(idx: int, out_dir: str) -> tuple[int, str, bool, str | None]:
"""Returns (idx, basename, cached, error)."""
fname = f"face_{idx:04d}.jpg"
full = os.path.join(out_dir, fname)
if os.path.exists(full) and os.path.getsize(full) > 1024:
return idx, fname, True, None
try:
req = urllib.request.Request(URL, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=20) as resp:
blob = resp.read()
if len(blob) < 1024:
return idx, fname, False, f"response too small ({len(blob)} bytes)"
with open(full, "wb") as f:
f.write(blob)
return idx, fname, False, None
except urllib.error.URLError as e:
return idx, fname, False, f"urlerror: {e}"
except Exception as e:
return idx, fname, False, f"{type(e).__name__}: {e}"
def maybe_tag_gender(records: list[dict], out_dir: str) -> dict[str, int]:
"""If deepface is installed, label records that don't already have a
gender. Returns a count summary; mutates records in place.
Preservation contract: never overwrites prior `gender` (or any other
tag — race/age/excluded — set by tag_face_pool.py). On deepface
import failure, leaves existing tags alone instead of resetting them
to None. The previous behavior wiped 952 hand-classified rows when
fetch_face_pool was re-run from a Python without deepface installed."""
try:
from deepface import DeepFace # type: ignore
except Exception as e:
print(f" (deepface unavailable: {e}) — leaving existing tags untouched")
for r in records:
r.setdefault("gender", None)
already = sum(1 for r in records if r.get("gender") in ("man", "woman"))
return {"preserved_tagged": already, "untagged": len(records) - already}
todo = [r for r in records if r.get("gender") not in ("man", "woman")]
if not todo:
print(" every record already has gender — nothing to tag.")
return {"preserved_tagged": len(records)}
print(f" tagging gender via deepface ({len(todo)} of {len(records)} records, CPU; ~0.5-1s per face)…")
counts: dict[str, int] = {}
for i, r in enumerate(todo):
full = os.path.join(out_dir, r["file"])
try:
ana = DeepFace.analyze(
img_path=full,
actions=["gender"],
enforce_detection=False,
silent=True,
)
if isinstance(ana, list):
ana = ana[0] if ana else {}
g_raw = (ana.get("dominant_gender") or "").lower().strip()
r["gender"] = (
"man" if g_raw.startswith("man") else
"woman" if g_raw.startswith("woman") else
None
)
except Exception as e:
r["gender"] = None
r["gender_error"] = f"{type(e).__name__}: {e}"
counts[r["gender"] or "unknown"] = counts.get(r["gender"] or "unknown", 0) + 1
if (i + 1) % 25 == 0:
print(f" [{i+1}/{len(todo)}] {counts}")
return counts
def main():
p = argparse.ArgumentParser()
p.add_argument("--count", type=int, default=300, help="how many faces to maintain in pool")
p.add_argument(
"--out",
default=os.path.join(os.path.dirname(__file__), "..", "..", "data", "headshots"),
)
p.add_argument("--concurrency", type=int, default=3, help="parallel fetches (be polite)")
p.add_argument("--no-gender", action="store_true", help="skip deepface gender tagging")
p.add_argument("--shrink", action="store_true",
help="allow --count to drop manifest entries with id >= count. Default: preserve them.")
args = p.parse_args()
out = os.path.realpath(args.out)
os.makedirs(out, exist_ok=True)
# Load any existing manifest into a by-id dict so prior tags
# (gender / race / age / excluded) survive the rewrite. Also
# naturally dedupes — if the file accidentally has duplicate
# lines for the same id (this is how we ended up with a 2497-
# row manifest backing a 1000-face pool), the last one wins.
manifest = os.path.join(out, "manifest.jsonl")
existing: dict[int, dict] = {}
if os.path.exists(manifest):
dup_count = 0
with open(manifest) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
rid = row.get("id")
if not isinstance(rid, int):
continue
if rid in existing:
dup_count += 1
existing[rid] = row
print(f"Loaded existing manifest: {len(existing)} unique ids ({dup_count} duplicate lines collapsed)")
max_existing = max(existing.keys()) if existing else -1
if max_existing >= args.count and not args.shrink:
print(
f"\nERROR: --count={args.count} would drop {sum(1 for k in existing if k >= args.count)} "
f"manifest entries (max existing id = {max_existing}). Pass --shrink to allow.\n",
file=sys.stderr,
)
sys.exit(2)
print(f"Fetching {args.count} faces → {out}")
print(f"Source: {URL} (synthetic StyleGAN — no real people)")
results: list[dict] = [None] * args.count # type: ignore
t0 = time.time()
with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as ex:
futs = {ex.submit(fetch_one, i, out): i for i in range(args.count)}
for done, fut in enumerate(as_completed(futs), 1):
idx, fname, cached, err = fut.result()
# Start from prior manifest row (preserves gender/race/age/excluded)
# and overlay only the fields fetch_one is responsible for.
base = dict(existing.get(idx, {}))
base.update({
"id": idx,
"file": fname,
"cached": cached,
"error": err,
})
results[idx] = base
if done % 25 == 0 or done == args.count:
ok = sum(1 for r in results if r and not r.get("error"))
print(f" [{done}/{args.count}] {ok} ok ({time.time()-t0:.1f}s)")
# Drop slots that errored or are still None (shouldn't happen)
records = [r for r in results if r and not r.get("error")]
print(f"\nPool ready: {len(records)} faces, {sum(1 for r in records if r['cached'])} from cache")
preserved_tags = sum(1 for r in records if r.get("gender") in ("man", "woman"))
if preserved_tags:
print(f"Preserved {preserved_tags} prior gender tags (and any race/age/excluded fields).")
if not args.no_gender and records:
print("\nGender-tagging pass:")
summary = maybe_tag_gender(records, out)
print(f" distribution: {summary}")
else:
for r in records:
r.setdefault("gender", None)
# If --shrink was NOT used and somehow id >= count rows are still in
# `existing` (which can only happen if the early gate was bypassed),
# carry them forward so we don't quietly drop them.
if not args.shrink:
for rid, row in existing.items():
if rid >= args.count and rid not in {r["id"] for r in records}:
records.append(row)
records.sort(key=lambda r: r.get("id", 0))
# Strip transient flags before persisting
for r in records:
r.pop("cached", None)
r.pop("error", None)
# Atomic write — if a re-run is interrupted, manifest stays intact.
tmp = manifest + ".tmp"
with open(tmp, "w") as f:
for r in records:
f.write(json.dumps(r) + "\n")
os.replace(tmp, manifest)
print(f"\nManifest: {manifest} ({len(records)} entries)")
# Quick checksum manifest for downstream debugging
h = hashlib.sha256()
for r in records:
h.update(r["file"].encode())
h.update(b"|")
h.update((r.get("gender") or "?").encode())
print(f"Pool fingerprint (sha256): {h.hexdigest()[:16]}")
if __name__ == "__main__":
main()