Surfaced by today's untracked-files audit. None of these are accidents —
multiple are referenced by name in CLAUDE.md and memory files but were
never added.
Categories:
- docs/PHASE_AUDIT_GUIDE.md (106 LOC) — Claude Code phase audit guidance
- ops/systemd/lakehouse-langfuse-bridge.service — Langfuse bridge unit
- package.json — top-level npm manifest
- scripts/e2e_pipeline_check.sh + production_smoke.sh — real test scripts
- reports/kimi/audit-last-week*.md — the "Two reports live" CLAUDE.md cites
- tests/multi-agent/scenarios/ — 44 staffing scenarios (cutover decision A)
- tests/multi-agent/playbooks/ — 102 playbook records
- tests/battery/, tests/agent_test/PRD.md, tests/real-world/* — real tests
- sidecar/sidecar/{lab_ui,pipeline_lab}.py — 888 LOC dev-only UIs that
remain in service post-sidecar-drop (commit ba928b1 explicitly kept them)
Sensitivity check: scenarios use synthetic company names ("Heritage Foods",
"Cornerstone Fabrication"); audit reports describe code findings only;
no PII or secrets surfaced.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
537 lines
24 KiB
Bash
Executable File
537 lines
24 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
# ------------------------------------------------------------
|
|
# End-to-end pipeline verification for Lakehouse.
|
|
#
|
|
# Generates realistic staffing-style data, runs it through every
|
|
# shipped pipeline stage, asserts correctness at each step, and
|
|
# cleans up after itself.
|
|
#
|
|
# Stages exercised:
|
|
# 0. Preflight — gateway + sidecar reachability
|
|
# 1. Data generation — 1000 candidates, 200 placements, 10 resumes
|
|
# 2. CSV ingest — Phase 6.1 (via ?name= query param)
|
|
# 3. NDJSON ingest — Phase 6.2
|
|
# 4. SQL queries + joins — Phase 2, Phase 8 hot cache
|
|
# 5. Content-hash re-ingest dedup — Phase 6.4
|
|
# 6. Idempotent register — ADR-020 (same-fingerprint path)
|
|
# 7. Schema-drift rejection — ADR-020 (409 Conflict path)
|
|
# 8. Catalog dedupe no-op — ADR-020 (clean state)
|
|
# 9. Metadata enrichment — Phase 10 POST
|
|
# 10. PII auto-detection audit — Phase 10
|
|
# 11. Vector index + search — Phase 7 (documents pulled via SQL)
|
|
# 12. Cleanup + baseline verify — no-orphan guarantee
|
|
#
|
|
# Usage:
|
|
# ./scripts/e2e_pipeline_check.sh # run all stages
|
|
# SKIP_VECTOR=1 ./scripts/e2e_pipeline_check.sh # skip Ollama-bound steps
|
|
# KEEP_DATA=1 ./scripts/e2e_pipeline_check.sh # leave /tmp artifacts
|
|
#
|
|
# Exit codes:
|
|
# 0 all assertions passed
|
|
# 1 one or more assertions failed
|
|
# 2 preflight failed (service unreachable)
|
|
# ------------------------------------------------------------
|
|
|
|
set -u
|
|
set -o pipefail
|
|
|
|
GATEWAY="${GATEWAY:-http://localhost:3100}"
|
|
SIDECAR="${SIDECAR:-http://localhost:3200}"
|
|
WORKDIR="${WORKDIR:-/tmp/lakehouse_e2e}"
|
|
DATA_ROOT="${DATA_ROOT:-/home/profit/lakehouse/data}"
|
|
SKIP_VECTOR="${SKIP_VECTOR:-0}"
|
|
KEEP_DATA="${KEEP_DATA:-0}"
|
|
|
|
RUN_ID="e2e_$(date +%s)"
|
|
CAND_DS="${RUN_ID}_candidates"
|
|
PLACE_DS="${RUN_ID}_placements"
|
|
RESUME_DS="${RUN_ID}_resumes"
|
|
VEC_IDX="${RESUME_DS}_v1"
|
|
|
|
# Color names use a CC_ prefix so they can't be shadowed by single-letter
|
|
# local variables like `R` that hold curl responses elsewhere in the script.
|
|
if [[ -t 1 ]]; then
|
|
CC_GRN=$'\033[0;32m'; CC_RED=$'\033[0;31m'; CC_YLW=$'\033[1;33m'
|
|
CC_BLU=$'\033[1;34m'; CC_DIM=$'\033[2m'; CC_RST=$'\033[0m'
|
|
else
|
|
CC_GRN=''; CC_RED=''; CC_YLW=''; CC_BLU=''; CC_DIM=''; CC_RST=''
|
|
fi
|
|
|
|
PASS=0; FAIL=0; WARN=0; STARTED_AT=$(date +%s)
|
|
FAILURES=()
|
|
|
|
pass() { printf ' %s✓%s %s\n' "$CC_GRN" "$CC_RST" "$1"; PASS=$((PASS+1)); }
|
|
fail() { printf ' %s✗%s %s\n' "$CC_RED" "$CC_RST" "$1"; FAIL=$((FAIL+1)); FAILURES+=("$1"); }
|
|
warn() { printf ' %s!%s %s\n' "$CC_YLW" "$CC_RST" "$1"; WARN=$((WARN+1)); }
|
|
step() { printf '\n%s== %s ==%s\n' "$CC_BLU" "$1" "$CC_RST"; }
|
|
info() { printf ' %s%s%s\n' "$CC_DIM" "$1" "$CC_RST"; }
|
|
die() { printf '%sFATAL: %s%s\n' "$CC_RED" "$1" "$CC_RST" >&2; cleanup; exit 2; }
|
|
|
|
assert_eq() {
|
|
if [[ "$1" == "$2" ]]; then pass "$3 ($1)"; else fail "$3: got '$1', expected '$2'"; fi
|
|
}
|
|
|
|
http_code() {
|
|
local method="$1" path="$2" data="${3:-}"
|
|
if [[ -n "$data" ]]; then
|
|
curl -s -o /dev/null -w '%{http_code}' -X "$method" "$GATEWAY$path" \
|
|
-H 'Content-Type: application/json' -d "$data"
|
|
else
|
|
curl -s -o /dev/null -w '%{http_code}' -X "$method" "$GATEWAY$path"
|
|
fi
|
|
}
|
|
|
|
# query_scalar <sql> -> first column of first row as string, sentinel on empty/error
|
|
query_scalar() {
|
|
local sql="$1"
|
|
local payload
|
|
payload=$(python3 -c 'import json,sys; print(json.dumps({"sql": sys.argv[1]}))' "$sql")
|
|
curl -s -X POST "$GATEWAY/query/sql" \
|
|
-H 'Content-Type: application/json' \
|
|
-d "$payload" \
|
|
| python3 -c '
|
|
import sys, json
|
|
try:
|
|
r = json.load(sys.stdin)
|
|
except Exception:
|
|
print("__PARSE_ERROR__"); sys.exit(0)
|
|
if isinstance(r, dict) and "error" in r:
|
|
sys.stderr.write("query error: " + str(r["error"]) + "\n")
|
|
print("__ERROR__"); sys.exit(0)
|
|
rows = r.get("rows") if isinstance(r, dict) else None
|
|
if not rows:
|
|
print("__NO_ROWS__"); sys.exit(0)
|
|
row = rows[0]
|
|
print(next(iter(row.values())))
|
|
'
|
|
}
|
|
|
|
cleanup() {
|
|
[[ "$KEEP_DATA" == "1" ]] && { info "KEEP_DATA=1 — leaving $WORKDIR"; return; }
|
|
info "cleaning up test datasets for $RUN_ID"
|
|
|
|
# Catch any previous-run zombies too: any catalog entry whose name
|
|
# starts with "e2e_" is definitionally ours. Using DELETE (added for
|
|
# this script's needs) purges both the live registry and the manifest
|
|
# file atomically, so the next run doesn't trip on zombie entries
|
|
# pointing at parquets we've already rm'd.
|
|
local names
|
|
names=$(curl -s "$GATEWAY/catalog/datasets" 2>/dev/null \
|
|
| python3 -c "
|
|
import sys, json
|
|
try: ds = json.load(sys.stdin)
|
|
except Exception: sys.exit(0)
|
|
for d in ds:
|
|
if d['name'].startswith('e2e_'):
|
|
print(d['name'])
|
|
" 2>/dev/null || true)
|
|
local removed=0
|
|
for n in $names; do
|
|
curl -s -o /dev/null -X DELETE "$GATEWAY/catalog/datasets/by-name/$n" && removed=$((removed+1))
|
|
done
|
|
|
|
# Delete any stray parquet + vector artifacts we can positively
|
|
# attribute to an e2e_ prefix.
|
|
rm -f "$DATA_ROOT/datasets/"e2e_*.parquet 2>/dev/null || true
|
|
rm -f "$DATA_ROOT/vectors/"e2e_*.parquet 2>/dev/null || true
|
|
rm -rf "$WORKDIR" 2>/dev/null || true
|
|
info "deleted $removed e2e datasets (covers this run + any prior zombies)"
|
|
}
|
|
trap cleanup EXIT
|
|
|
|
# ============================================================
|
|
# 0. Preflight
|
|
# ============================================================
|
|
step "0. Preflight"
|
|
|
|
curl -sf -m 3 "$GATEWAY/health" >/dev/null 2>&1 || die "gateway not reachable at $GATEWAY"
|
|
pass "gateway /health (200)"
|
|
|
|
SIDECAR_UP=0
|
|
if curl -sf -m 3 "$SIDECAR/health" >/dev/null 2>&1; then
|
|
SIDECAR_UP=1; pass "sidecar /health (200)"
|
|
else
|
|
warn "sidecar unreachable — vector stage will be skipped"
|
|
SKIP_VECTOR=1
|
|
fi
|
|
|
|
# Purge any e2e_* zombies from prior runs (stale registry entries that
|
|
# would otherwise break DataFusion schema inference for every query).
|
|
ZOMBIES=$(curl -s "$GATEWAY/catalog/datasets" 2>/dev/null \
|
|
| python3 -c "
|
|
import sys, json
|
|
try: ds = json.load(sys.stdin)
|
|
except Exception: sys.exit(0)
|
|
for d in ds:
|
|
if d['name'].startswith('e2e_'):
|
|
print(d['name'])
|
|
" 2>/dev/null || true)
|
|
if [[ -n "$ZOMBIES" ]]; then
|
|
ZCOUNT=$(echo "$ZOMBIES" | wc -l | tr -d ' ')
|
|
for n in $ZOMBIES; do
|
|
curl -s -o /dev/null -X DELETE "$GATEWAY/catalog/datasets/by-name/$n"
|
|
done
|
|
info "pre-cleaned $ZCOUNT e2e_ zombies from prior runs"
|
|
fi
|
|
|
|
BASELINE=$(curl -s "$GATEWAY/catalog/datasets" | python3 -c 'import sys,json; print(len(json.load(sys.stdin)))')
|
|
info "baseline dataset count: $BASELINE"
|
|
|
|
# ============================================================
|
|
# 1. Generate realistic data
|
|
# ============================================================
|
|
step "1. Generate realistic staffing data"
|
|
|
|
mkdir -p "$WORKDIR"
|
|
# Seed with RUN_ID (which embeds the wall-clock timestamp) so each run
|
|
# produces different content. Otherwise the content-hash dedup from
|
|
# Phase 6.4 keys off a stale hash that lingers in the live registry
|
|
# until the next gateway restart, and subsequent runs silently dedupe.
|
|
python3 - "$WORKDIR" "$RUN_ID" <<'PYEOF'
|
|
import csv, json, random, sys, os
|
|
workdir, run_id = sys.argv[1], sys.argv[2]
|
|
# Mix RUN_ID into the seed so content differs per run, but keep it
|
|
# deterministic within a single run.
|
|
random.seed(hash(run_id) & 0x7FFFFFFF)
|
|
|
|
FIRST = ['Aisha','Brandon','Carlos','Daria','Eli','Fiona','Gabriel','Hana','Ian','Julia',
|
|
'Kofi','Lena','Mateo','Nadia','Oscar','Priya','Quinn','Raj','Sofia','Tomas',
|
|
'Uma','Victor','Wendy','Xander','Yuki','Zara']
|
|
LAST = ['Adams','Brown','Chen','Davis','Evans','Fisher','Garcia','Hughes','Ibrahim','Johnson',
|
|
'Kim','Lopez','Martinez','Nguyen','Ortiz','Patel','Rossi','Singh','Thomas','Umar',
|
|
'Vargas','Williams','Xu','Young','Zhang','OConnor']
|
|
PLACES = [('Chicago','IL'),('New York','NY'),('San Francisco','CA'),('Austin','TX'),
|
|
('Seattle','WA'),('Denver','CO'),('Boston','MA'),('Atlanta','GA'),
|
|
('Miami','FL'),('Phoenix','AZ')]
|
|
SKILL_GROUPS = [
|
|
['Python','AWS','Docker'],['Java','Spring','Kubernetes'],
|
|
['React','TypeScript','Node'],['Go','PostgreSQL','gRPC'],
|
|
['Rust','DataFusion','Parquet'],['C#','.NET','Azure'],
|
|
['Ruby','Rails','Redis'],['Scala','Spark','Kafka'],
|
|
['Swift','iOS','CoreData'],['Kotlin','Android','Jetpack'],
|
|
]
|
|
STATUSES = ['active','placed','inactive','blocked']
|
|
STATUS_WEIGHTS = [60, 25, 10, 5]
|
|
|
|
with open(os.path.join(workdir, 'candidates.csv'), 'w', newline='') as f:
|
|
w = csv.DictWriter(f, fieldnames=[
|
|
'candidate_id','first_name','last_name','email','phone',
|
|
'city','state','skills','years_experience','hourly_rate_usd','status'])
|
|
w.writeheader()
|
|
for i in range(1, 1001):
|
|
fn, ln = random.choice(FIRST), random.choice(LAST)
|
|
city, state = random.choice(PLACES)
|
|
w.writerow({
|
|
'candidate_id': f'CAND-{i:05d}',
|
|
'first_name': fn, 'last_name': ln,
|
|
'email': f'{fn.lower()}.{ln.lower()}{i}@example.com',
|
|
'phone': f'({random.randint(200,999)}) {random.randint(200,999)}-{random.randint(1000,9999)}',
|
|
'city': city, 'state': state,
|
|
'skills': ','.join(random.choice(SKILL_GROUPS)),
|
|
'years_experience': random.randint(0, 20),
|
|
'hourly_rate_usd': random.randint(35, 185),
|
|
'status': random.choices(STATUSES, weights=STATUS_WEIGHTS)[0],
|
|
})
|
|
|
|
CLIENTS = ['Acme Corp','Globex','Initech','Umbrella','Wayne Enterprises',
|
|
'Stark Industries','Tyrell','Cyberdyne','Massive Dynamic','Oscorp']
|
|
with open(os.path.join(workdir, 'placements.ndjson'), 'w') as f:
|
|
for i in range(1, 201):
|
|
f.write(json.dumps({
|
|
'placement_id': f'PLACE-{i:04d}',
|
|
'candidate_id': f'CAND-{random.randint(1,1000):05d}',
|
|
'client': random.choice(CLIENTS),
|
|
'start_date': f'2026-{random.randint(1,4):02d}-{random.randint(1,28):02d}',
|
|
'weekly_hours': random.choice([20,25,30,35,40]),
|
|
'bill_rate': random.randint(80, 250),
|
|
'placement_status': random.choice(['active','completed','terminated']),
|
|
}) + '\n')
|
|
|
|
RESUMES = [
|
|
'Senior Python engineer with 8 years of cloud infrastructure experience. Expert in AWS, Docker, and distributed systems design. Led migration of monolithic legacy system to microservices.',
|
|
'Full-stack React and TypeScript developer specializing in real-time dashboards. Built financial trading interfaces. GraphQL, WebSocket, performance optimization.',
|
|
'Data engineer with deep Apache Spark and Kafka expertise. Seven years on streaming analytics pipelines processing billions of events per day. Scala and Python.',
|
|
'Embedded systems engineer with C++ and Rust experience. Worked on automotive ADAS systems and industrial IoT devices. Low-level firmware, RTOS.',
|
|
'DevOps engineer with Kubernetes and Terraform expertise. Six years at hypergrowth startups. Prometheus, Grafana, and observability tooling.',
|
|
'Machine learning engineer specializing in NLP. Built production transformer-based systems. PyTorch, Hugging Face, fine-tuning large language models.',
|
|
'iOS developer with Swift and SwiftUI. Four years building consumer apps at mid-size tech companies. Offline-first architectures and CoreData.',
|
|
'Backend Go developer focused on high-throughput APIs. Built payment processing systems handling millions of transactions. PostgreSQL, gRPC, Redis.',
|
|
'Security engineer with penetration testing and threat modeling experience. OSCP certified. Web application security, AppSec code review, SAST and DAST tooling.',
|
|
'Site reliability engineer with Linux internals and performance tuning expertise. Ten years at large-scale infrastructure. Tracing, profiling, kernel-level debugging.',
|
|
]
|
|
with open(os.path.join(workdir, 'resumes.ndjson'), 'w') as f:
|
|
for i, r in enumerate(RESUMES, 1):
|
|
f.write(json.dumps({'doc_id': f'RES-{i:03d}', 'resume_text': r}) + '\n')
|
|
PYEOF
|
|
|
|
pass "candidates.csv (1000 rows, 11 cols)"
|
|
pass "placements.ndjson (200 rows, 7 cols)"
|
|
pass "resumes.ndjson (10 rows, 2 cols)"
|
|
|
|
# ============================================================
|
|
# 2. CSV ingest
|
|
# ============================================================
|
|
step "2. CSV ingest (Phase 6.1)"
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/ingest/file?name=$CAND_DS" -F "file=@$WORKDIR/candidates.csv")
|
|
echo "$R" | python3 -c 'import sys,json; json.load(sys.stdin)' 2>/dev/null \
|
|
|| { fail "ingest response was not JSON: $(echo "$R" | head -c 200)"; R='{}'; }
|
|
|
|
ROWS=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("rows",-1))' 2>/dev/null)
|
|
DEDUP=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("deduplicated","?"))' 2>/dev/null)
|
|
DS_NAME=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("dataset_name","?"))' 2>/dev/null)
|
|
assert_eq "$DS_NAME" "$CAND_DS" "ingest respected ?name= query param"
|
|
assert_eq "$ROWS" "1000" "ingest rows"
|
|
assert_eq "$DEDUP" "False" "first upload not deduplicated"
|
|
|
|
REG_ROWS=$(curl -s "$GATEWAY/catalog/datasets/by-name/$CAND_DS" \
|
|
| python3 -c 'import sys,json; print(json.load(sys.stdin).get("row_count","null"))')
|
|
assert_eq "$REG_ROWS" "1000" "manifest row_count reflects ingest"
|
|
|
|
# ============================================================
|
|
# 3. NDJSON ingest
|
|
# ============================================================
|
|
step "3. NDJSON ingest (Phase 6.2)"
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/ingest/file?name=$PLACE_DS" -F "file=@$WORKDIR/placements.ndjson")
|
|
ROWS=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("rows",-1))' 2>/dev/null)
|
|
assert_eq "$ROWS" "200" "placements NDJSON ingest rows"
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/ingest/file?name=$RESUME_DS" -F "file=@$WORKDIR/resumes.ndjson")
|
|
ROWS=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("rows",-1))' 2>/dev/null)
|
|
assert_eq "$ROWS" "10" "resumes NDJSON ingest rows"
|
|
|
|
# ============================================================
|
|
# 4. SQL queries + JOIN + cache
|
|
# ============================================================
|
|
step "4. SQL queries (Phase 2, Phase 8)"
|
|
|
|
N=$(query_scalar "SELECT COUNT(*) FROM $CAND_DS")
|
|
assert_eq "$N" "1000" "candidates COUNT(*)"
|
|
|
|
N=$(query_scalar "SELECT COUNT(*) FROM $CAND_DS WHERE status = 'active'")
|
|
if [[ "$N" =~ ^[0-9]+$ ]] && (( N > 400 && N < 700 )); then
|
|
pass "active candidates in plausible range ($N, expect ~600)"
|
|
else
|
|
fail "active candidates count out of range: $N"
|
|
fi
|
|
|
|
N=$(query_scalar "
|
|
SELECT COUNT(DISTINCT c.candidate_id)
|
|
FROM $CAND_DS c
|
|
JOIN $PLACE_DS p ON c.candidate_id = p.candidate_id
|
|
WHERE p.placement_status = 'active'
|
|
")
|
|
if [[ "$N" =~ ^[0-9]+$ ]] && (( N > 0 && N <= 200 )); then
|
|
pass "cross-dataset JOIN with filter returns $N rows"
|
|
else
|
|
fail "JOIN returned unexpected count: $N"
|
|
fi
|
|
|
|
AVG=$(query_scalar "SELECT AVG(hourly_rate_usd) FROM $CAND_DS")
|
|
if python3 -c "import sys; v=float('$AVG'); sys.exit(0 if 100 < v < 130 else 1)" 2>/dev/null; then
|
|
pass "average hourly rate in plausible range ($AVG, expect ~110)"
|
|
else
|
|
fail "average hourly rate out of range: $AVG"
|
|
fi
|
|
|
|
CODE=$(http_code POST "/query/cache/pin" "{\"dataset\":\"$CAND_DS\"}")
|
|
assert_eq "$CODE" "200" "cache pin HTTP"
|
|
|
|
# ============================================================
|
|
# 5. Content-hash re-ingest dedup (Phase 6.4)
|
|
# ============================================================
|
|
step "5. Content-hash re-ingest dedup"
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/ingest/file?name=$CAND_DS" -F "file=@$WORKDIR/candidates.csv")
|
|
DEDUP=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("deduplicated","?"))' 2>/dev/null)
|
|
assert_eq "$DEDUP" "True" "re-upload same file is deduplicated"
|
|
|
|
# ============================================================
|
|
# 6. Idempotent register — same fingerprint (ADR-020)
|
|
# ============================================================
|
|
step "6. Idempotent register (ADR-020 same-fp path)"
|
|
|
|
DS=$(curl -s "$GATEWAY/catalog/datasets/by-name/$CAND_DS")
|
|
FP=$(echo "$DS" | python3 -c 'import sys,json; print(json.load(sys.stdin)["schema_fingerprint"])')
|
|
OBJS=$(echo "$DS" | python3 -c 'import sys,json,json as j; print(j.dumps(json.load(sys.stdin)["objects"]))')
|
|
ID_BEFORE=$(echo "$DS" | python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
|
|
|
|
PAYLOAD=$(python3 -c "import json,sys; print(json.dumps({'name':sys.argv[1],'schema_fingerprint':sys.argv[2],'objects':json.loads(sys.argv[3])}))" "$CAND_DS" "$FP" "$OBJS")
|
|
CODE=$(http_code POST "/catalog/datasets" "$PAYLOAD")
|
|
assert_eq "$CODE" "201" "same-fp re-register returns 201"
|
|
|
|
ID_AFTER=$(curl -s "$GATEWAY/catalog/datasets/by-name/$CAND_DS" | python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
|
|
assert_eq "$ID_AFTER" "$ID_BEFORE" "same DatasetId after re-register"
|
|
|
|
COUNT=$(curl -s "$GATEWAY/catalog/datasets" | python3 -c "import sys,json; print(sum(1 for d in json.load(sys.stdin) if d['name']=='$CAND_DS'))")
|
|
assert_eq "$COUNT" "1" "no duplicate manifest created"
|
|
|
|
# ============================================================
|
|
# 7. Schema-drift rejection (409)
|
|
# ============================================================
|
|
step "7. Schema-drift rejection (ADR-020 409 path)"
|
|
|
|
PAYLOAD=$(python3 -c "import json,sys; print(json.dumps({'name':sys.argv[1],'schema_fingerprint':'deadbeefnotmatching','objects':json.loads(sys.argv[2])}))" "$CAND_DS" "$OBJS")
|
|
CODE=$(http_code POST "/catalog/datasets" "$PAYLOAD")
|
|
assert_eq "$CODE" "409" "different-fp rejected with 409"
|
|
|
|
# ============================================================
|
|
# 8. Dedupe no-op on clean catalog
|
|
# ============================================================
|
|
step "8. Dedupe no-op on clean state"
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/catalog/dedupe")
|
|
GROUPS=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin)["groups"])')
|
|
REMOVED=$(echo "$R" | python3 -c 'import sys,json; print(json.load(sys.stdin)["removed"])')
|
|
assert_eq "$GROUPS" "0" "dedupe groups (clean catalog)"
|
|
assert_eq "$REMOVED" "0" "dedupe removed count"
|
|
|
|
# ============================================================
|
|
# 9. Metadata enrichment (Phase 10)
|
|
# ============================================================
|
|
step "9. Metadata enrichment (Phase 10)"
|
|
|
|
CODE=$(http_code POST "/catalog/datasets/by-name/$CAND_DS/metadata" \
|
|
"{\"owner\":\"e2e-test\",\"description\":\"$RUN_ID synthetic candidates\",\"tags\":[\"test\",\"synthetic\"]}")
|
|
assert_eq "$CODE" "200" "POST metadata HTTP"
|
|
|
|
META=$(curl -s "$GATEWAY/catalog/datasets/by-name/$CAND_DS")
|
|
OWNER=$(echo "$META" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("owner",""))')
|
|
assert_eq "$OWNER" "e2e-test" "owner persisted"
|
|
|
|
# ============================================================
|
|
# 10. PII auto-detection (Phase 10)
|
|
# ============================================================
|
|
step "10. PII auto-detection (Phase 10)"
|
|
|
|
PII_COLS=$(echo "$META" | python3 -c '
|
|
import sys, json
|
|
m = json.load(sys.stdin)
|
|
pii = [c["name"] for c in m.get("columns",[]) if c.get("is_pii") or (isinstance(c.get("sensitivity"),str) and c["sensitivity"].lower()=="pii")]
|
|
print(" ".join(pii) if pii else "__NONE__")')
|
|
if [[ "$PII_COLS" == *"email"* ]] && [[ "$PII_COLS" == *"phone"* ]]; then
|
|
pass "email and phone flagged as PII ($PII_COLS)"
|
|
elif [[ "$PII_COLS" == "__NONE__" ]]; then
|
|
warn "no PII flagged — auto-detection may not run on this path"
|
|
else
|
|
warn "partial PII detection: $PII_COLS"
|
|
fi
|
|
|
|
# ============================================================
|
|
# 11. Vector index + semantic search (Phase 7)
|
|
# ============================================================
|
|
step "11. Vector index + semantic search (Phase 7)"
|
|
|
|
if [[ "$SKIP_VECTOR" == "1" ]]; then
|
|
warn "SKIP_VECTOR=1 — skipping vector pipeline"
|
|
else
|
|
# Pull documents out of the ingested resumes dataset via SQL,
|
|
# then feed to the inline /vectors/index body. This exercises
|
|
# the query→embed integration rather than pre-canned input.
|
|
DOCS=$(curl -s -X POST "$GATEWAY/query/sql" \
|
|
-H 'Content-Type: application/json' \
|
|
-d "$(python3 -c "import json; print(json.dumps({'sql': 'SELECT doc_id, resume_text FROM $RESUME_DS'}))")" \
|
|
| python3 -c '
|
|
import sys, json
|
|
r = json.load(sys.stdin)
|
|
docs = [{"id": row["doc_id"], "text": row["resume_text"]} for row in r.get("rows", [])]
|
|
print(json.dumps(docs))')
|
|
DOC_COUNT=$(echo "$DOCS" | python3 -c 'import sys,json; print(len(json.load(sys.stdin)))')
|
|
assert_eq "$DOC_COUNT" "10" "pulled docs via SQL for embedding"
|
|
|
|
PAYLOAD=$(python3 -c "
|
|
import json, sys
|
|
print(json.dumps({
|
|
'index_name': sys.argv[1],
|
|
'source': sys.argv[2],
|
|
'documents': json.loads(sys.argv[3]),
|
|
'chunk_size': 500,
|
|
'overlap': 50,
|
|
}))" "$VEC_IDX" "$RESUME_DS" "$DOCS")
|
|
|
|
R=$(curl -s -X POST "$GATEWAY/vectors/index" -H 'Content-Type: application/json' -d "$PAYLOAD")
|
|
JOB_ID=$(echo "$R" | python3 -c 'import sys,json; d=json.load(sys.stdin); print(d.get("job_id","__NONE__"))' 2>/dev/null)
|
|
|
|
if [[ "$JOB_ID" == "__NONE__" || -z "$JOB_ID" ]]; then
|
|
fail "vector index job rejected: $(echo "$R" | head -c 200)"
|
|
else
|
|
pass "embedding job accepted (job=$JOB_ID)"
|
|
# Poll up to 90s for 10 short resumes; Ollama cold-start can be slow.
|
|
JOB_STATUS="unknown"
|
|
for _ in $(seq 1 45); do
|
|
JOB_STATUS=$(curl -s "$GATEWAY/vectors/jobs/$JOB_ID" 2>/dev/null \
|
|
| python3 -c '
|
|
import sys, json
|
|
try: print(json.load(sys.stdin).get("status","?"))
|
|
except Exception: print("?")' 2>/dev/null)
|
|
[[ "$JOB_STATUS" == "completed" || "$JOB_STATUS" == "Completed" ]] && break
|
|
[[ "$JOB_STATUS" == "failed" || "$JOB_STATUS" == "Failed" ]] && break
|
|
sleep 2
|
|
done
|
|
|
|
case "$JOB_STATUS" in
|
|
completed|Completed)
|
|
pass "embedding job completed"
|
|
R=$(curl -s -X POST "$GATEWAY/vectors/search" \
|
|
-H 'Content-Type: application/json' \
|
|
-d "{\"index_name\":\"$VEC_IDX\",\"query\":\"fine-tuning large language models\",\"k\":3}")
|
|
TOP_DOC=$(echo "$R" | python3 -c '
|
|
import sys, json
|
|
r = json.load(sys.stdin)
|
|
if r.get("results"): print(r["results"][0].get("doc_id","?"))
|
|
else: print("__NONE__")' 2>/dev/null)
|
|
if [[ "$TOP_DOC" == "RES-006" ]]; then
|
|
pass "top match is ML/NLP resume (semantically correct)"
|
|
elif [[ "$TOP_DOC" == "__NONE__" ]]; then
|
|
fail "search returned no results"
|
|
else
|
|
warn "top match is $TOP_DOC (expected RES-006 — ranking may vary)"
|
|
fi ;;
|
|
*)
|
|
fail "embedding job did not complete (status=$JOB_STATUS)" ;;
|
|
esac
|
|
fi
|
|
fi
|
|
|
|
# ============================================================
|
|
# 12. Cleanup + baseline verify
|
|
# ============================================================
|
|
step "12. Cleanup + baseline verify"
|
|
|
|
cleanup
|
|
trap - EXIT
|
|
|
|
ON_DISK=$(ls "$DATA_ROOT/_catalog/manifests"/*.json 2>/dev/null | wc -l | tr -d ' ')
|
|
info "manifest files on disk now: $ON_DISK"
|
|
|
|
DISK_ORPHANS=0
|
|
if compgen -G "$DATA_ROOT/_catalog/manifests/*.json" > /dev/null; then
|
|
DISK_ORPHANS=$(grep -l "\"$RUN_ID" "$DATA_ROOT/_catalog/manifests"/*.json 2>/dev/null | wc -l | tr -d ' ')
|
|
fi
|
|
assert_eq "$DISK_ORPHANS" "0" "no orphan manifest files on disk for $RUN_ID"
|
|
|
|
LIVE_ORPHANS=$(curl -s "$GATEWAY/catalog/datasets" \
|
|
| python3 -c "import sys,json; print(sum(1 for d in json.load(sys.stdin) if d['name'].startswith('$RUN_ID')))")
|
|
if [[ "$LIVE_ORPHANS" != "0" ]]; then
|
|
warn "$LIVE_ORPHANS entries linger in live registry (clears on gateway restart; on-disk is ground truth)"
|
|
fi
|
|
|
|
# ============================================================
|
|
# Summary
|
|
# ============================================================
|
|
ELAPSED=$(( $(date +%s) - STARTED_AT ))
|
|
printf '\n%s─── Summary ───%s\n' "$CC_BLU" "$CC_RST"
|
|
printf ' run_id: %s\n' "$RUN_ID"
|
|
printf ' elapsed: %ss\n' "$ELAPSED"
|
|
printf ' passed: %s%d%s\n' "$CC_GRN" "$PASS" "$CC_RST"
|
|
printf ' failed: %s%d%s\n' "$CC_RED" "$FAIL" "$CC_RST"
|
|
printf ' warnings: %s%d%s\n' "$CC_YLW" "$WARN" "$CC_RST"
|
|
|
|
if (( FAIL > 0 )); then
|
|
printf '\n%sfailures:%s\n' "$CC_RED" "$CC_RST"
|
|
for f in "${FAILURES[@]}"; do printf ' - %s\n' "$f"; done
|
|
exit 1
|
|
fi
|
|
exit 0
|