staffing: face pool fetch preserves prior tags + --shrink gate + atomic manifest write
fetch_face_pool was wiping 952 hand-classified rows when re-run from a Python without deepface installed (it reset every gender to None). Now: - Loads existing manifest by id and overlays only fetch-owned fields, so gender/race/age/excluded survive a refetch. - deepface pass tags only records that don't already have a gender; deepface unavailable means "leave existing tags alone" not "reset". - New --shrink flag required to drop ids >= --count. Default refuses to shrink the pool silently. - Atomic write via tmp + os.replace so an interrupted run can't corrupt the manifest. - Dedupes duplicate id lines (root cause of the 2497-row manifest backing a 1000-face pool). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
51cc0a69cf
commit
313eec3c6e
@ -175,7 +175,7 @@
|
||||
{"id": 174, "file": "face_0174.jpg", "gender": "woman", "race": "east_asian", "age": 26}
|
||||
{"id": 175, "file": "face_0175.jpg", "gender": "man", "race": "caucasian", "age": 30}
|
||||
{"id": 176, "file": "face_0176.jpg", "gender": "man", "race": "middle_eastern", "age": 37}
|
||||
{"id": 177, "file": "face_0177.jpg", "gender": "man", "race": "middle_eastern", "age": 37}
|
||||
{"id": 177, "file": "face_0177.jpg", "gender": "woman", "race": "caucasian", "age": 25}
|
||||
{"id": 178, "file": "face_0178.jpg", "gender": "woman", "race": "caucasian", "age": 28}
|
||||
{"id": 179, "file": "face_0179.jpg", "gender": "man", "race": "hispanic", "age": 28}
|
||||
{"id": 180, "file": "face_0180.jpg", "gender": "woman", "race": "caucasian", "age": 38}
|
||||
@ -588,7 +588,7 @@
|
||||
{"id": 587, "file": "face_0587.jpg", "gender": "man", "race": "caucasian", "age": 34}
|
||||
{"id": 588, "file": "face_0588.jpg", "gender": "man", "race": "caucasian", "age": 21, "excluded": "minor"}
|
||||
{"id": 589, "file": "face_0589.jpg", "gender": "man", "race": "caucasian", "age": 21, "excluded": "minor"}
|
||||
{"id": 590, "file": "face_0590.jpg", "gender": "man", "race": "caucasian", "age": 26}
|
||||
{"id": 590, "file": "face_0590.jpg", "gender": "woman", "race": "hispanic", "age": 31}
|
||||
{"id": 591, "file": "face_0591.jpg", "gender": "woman", "race": "hispanic", "age": 31}
|
||||
{"id": 592, "file": "face_0592.jpg", "gender": "woman", "race": "caucasian", "age": 28}
|
||||
{"id": 593, "file": "face_0593.jpg", "gender": "woman", "race": "caucasian", "age": 28}
|
||||
|
||||
@ -53,20 +53,30 @@ def fetch_one(idx: int, out_dir: str) -> tuple[int, str, bool, str | None]:
|
||||
|
||||
|
||||
def maybe_tag_gender(records: list[dict], out_dir: str) -> dict[str, int]:
|
||||
"""If deepface is installed, label each record with gender. Returns
|
||||
a count summary; mutates records in place. On import error, returns
|
||||
None and tags every record as unknown."""
|
||||
"""If deepface is installed, label records that don't already have a
|
||||
gender. Returns a count summary; mutates records in place.
|
||||
|
||||
Preservation contract: never overwrites prior `gender` (or any other
|
||||
tag — race/age/excluded — set by tag_face_pool.py). On deepface
|
||||
import failure, leaves existing tags alone instead of resetting them
|
||||
to None. The previous behavior wiped 952 hand-classified rows when
|
||||
fetch_face_pool was re-run from a Python without deepface installed."""
|
||||
try:
|
||||
from deepface import DeepFace # type: ignore
|
||||
except Exception as e:
|
||||
print(f" (deepface unavailable: {e}) — pool will mix naturally")
|
||||
print(f" (deepface unavailable: {e}) — leaving existing tags untouched")
|
||||
for r in records:
|
||||
r["gender"] = None
|
||||
return {"unknown": len(records)}
|
||||
r.setdefault("gender", None)
|
||||
already = sum(1 for r in records if r.get("gender") in ("man", "woman"))
|
||||
return {"preserved_tagged": already, "untagged": len(records) - already}
|
||||
|
||||
print(" tagging gender via deepface (CPU; ~0.5-1s per face)…")
|
||||
todo = [r for r in records if r.get("gender") not in ("man", "woman")]
|
||||
if not todo:
|
||||
print(" every record already has gender — nothing to tag.")
|
||||
return {"preserved_tagged": len(records)}
|
||||
print(f" tagging gender via deepface ({len(todo)} of {len(records)} records, CPU; ~0.5-1s per face)…")
|
||||
counts: dict[str, int] = {}
|
||||
for i, r in enumerate(records):
|
||||
for i, r in enumerate(todo):
|
||||
full = os.path.join(out_dir, r["file"])
|
||||
try:
|
||||
ana = DeepFace.analyze(
|
||||
@ -88,7 +98,7 @@ def maybe_tag_gender(records: list[dict], out_dir: str) -> dict[str, int]:
|
||||
r["gender_error"] = f"{type(e).__name__}: {e}"
|
||||
counts[r["gender"] or "unknown"] = counts.get(r["gender"] or "unknown", 0) + 1
|
||||
if (i + 1) % 25 == 0:
|
||||
print(f" [{i+1}/{len(records)}] {counts}")
|
||||
print(f" [{i+1}/{len(todo)}] {counts}")
|
||||
return counts
|
||||
|
||||
|
||||
@ -101,11 +111,47 @@ def main():
|
||||
)
|
||||
p.add_argument("--concurrency", type=int, default=3, help="parallel fetches (be polite)")
|
||||
p.add_argument("--no-gender", action="store_true", help="skip deepface gender tagging")
|
||||
p.add_argument("--shrink", action="store_true",
|
||||
help="allow --count to drop manifest entries with id >= count. Default: preserve them.")
|
||||
args = p.parse_args()
|
||||
|
||||
out = os.path.realpath(args.out)
|
||||
os.makedirs(out, exist_ok=True)
|
||||
|
||||
# Load any existing manifest into a by-id dict so prior tags
|
||||
# (gender / race / age / excluded) survive the rewrite. Also
|
||||
# naturally dedupes — if the file accidentally has duplicate
|
||||
# lines for the same id (this is how we ended up with a 2497-
|
||||
# row manifest backing a 1000-face pool), the last one wins.
|
||||
manifest = os.path.join(out, "manifest.jsonl")
|
||||
existing: dict[int, dict] = {}
|
||||
if os.path.exists(manifest):
|
||||
dup_count = 0
|
||||
with open(manifest) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
row = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
rid = row.get("id")
|
||||
if not isinstance(rid, int):
|
||||
continue
|
||||
if rid in existing:
|
||||
dup_count += 1
|
||||
existing[rid] = row
|
||||
print(f"Loaded existing manifest: {len(existing)} unique ids ({dup_count} duplicate lines collapsed)")
|
||||
max_existing = max(existing.keys()) if existing else -1
|
||||
if max_existing >= args.count and not args.shrink:
|
||||
print(
|
||||
f"\nERROR: --count={args.count} would drop {sum(1 for k in existing if k >= args.count)} "
|
||||
f"manifest entries (max existing id = {max_existing}). Pass --shrink to allow.\n",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
print(f"Fetching {args.count} faces → {out}")
|
||||
print(f"Source: {URL} (synthetic StyleGAN — no real people)")
|
||||
|
||||
@ -115,12 +161,16 @@ def main():
|
||||
futs = {ex.submit(fetch_one, i, out): i for i in range(args.count)}
|
||||
for done, fut in enumerate(as_completed(futs), 1):
|
||||
idx, fname, cached, err = fut.result()
|
||||
results[idx] = {
|
||||
# Start from prior manifest row (preserves gender/race/age/excluded)
|
||||
# and overlay only the fields fetch_one is responsible for.
|
||||
base = dict(existing.get(idx, {}))
|
||||
base.update({
|
||||
"id": idx,
|
||||
"file": fname,
|
||||
"cached": cached,
|
||||
"error": err,
|
||||
}
|
||||
})
|
||||
results[idx] = base
|
||||
if done % 25 == 0 or done == args.count:
|
||||
ok = sum(1 for r in results if r and not r.get("error"))
|
||||
print(f" [{done}/{args.count}] {ok} ok ({time.time()-t0:.1f}s)")
|
||||
@ -128,6 +178,9 @@ def main():
|
||||
# Drop slots that errored or are still None (shouldn't happen)
|
||||
records = [r for r in results if r and not r.get("error")]
|
||||
print(f"\nPool ready: {len(records)} faces, {sum(1 for r in records if r['cached'])} from cache")
|
||||
preserved_tags = sum(1 for r in records if r.get("gender") in ("man", "woman"))
|
||||
if preserved_tags:
|
||||
print(f"Preserved {preserved_tags} prior gender tags (and any race/age/excluded fields).")
|
||||
|
||||
if not args.no_gender and records:
|
||||
print("\nGender-tagging pass:")
|
||||
@ -135,17 +188,28 @@ def main():
|
||||
print(f" distribution: {summary}")
|
||||
else:
|
||||
for r in records:
|
||||
r["gender"] = None
|
||||
r.setdefault("gender", None)
|
||||
|
||||
# If --shrink was NOT used and somehow id >= count rows are still in
|
||||
# `existing` (which can only happen if the early gate was bypassed),
|
||||
# carry them forward so we don't quietly drop them.
|
||||
if not args.shrink:
|
||||
for rid, row in existing.items():
|
||||
if rid >= args.count and rid not in {r["id"] for r in records}:
|
||||
records.append(row)
|
||||
records.sort(key=lambda r: r.get("id", 0))
|
||||
|
||||
# Strip transient flags before persisting
|
||||
for r in records:
|
||||
r.pop("cached", None)
|
||||
r.pop("error", None)
|
||||
|
||||
manifest = os.path.join(out, "manifest.jsonl")
|
||||
with open(manifest, "w") as f:
|
||||
# Atomic write — if a re-run is interrupted, manifest stays intact.
|
||||
tmp = manifest + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
for r in records:
|
||||
f.write(json.dumps(r) + "\n")
|
||||
os.replace(tmp, manifest)
|
||||
print(f"\nManifest: {manifest} ({len(records)} entries)")
|
||||
|
||||
# Quick checksum manifest for downstream debugging
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user