ROOT CAUSE: Python scripts polled status.get("processed", 0) but the
Rust Job struct serialized as "embedded_chunks". Scripts always saw 0,
looped forever printing "unknown: 0/50000" for 8+ hours.
Fix (both sides):
- Rust: added "processed" alias field + "total" field to Job struct,
kept in sync on every update_progress() and complete() call
- Python: fixed autonomous_agent.py and overnight_proof.sh to read
"embedded_chunks" as primary key
The actual embedding pipeline was working the whole time — 673K real
chunks embedded overnight. Only the monitoring was blind.
One-word bug, 8 hours of zombie output. This is why you test the
monitoring, not just the pipeline.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
533 lines
20 KiB
Python
533 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""Autonomous stress-test agent for the Lakehouse substrate.
|
|
|
|
Ingests the full Postgres knowledge_base, embeds it, creates profiles
|
|
for both Parquet+HNSW and Lance backends, runs recursive playbooks
|
|
that test every capability, captures errors, hot-swaps profiles to
|
|
compare model/backend configurations, and iterates.
|
|
|
|
Designed to run unattended. All state goes through the Lakehouse API
|
|
so the agent IS a real consumer of the substrate.
|
|
"""
|
|
|
|
import json, time, sys, os, traceback
|
|
from datetime import datetime
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import HTTPError
|
|
|
|
BASE = "http://localhost:3100"
|
|
PG_DSN = "postgresql://kbuser@localhost:5432/knowledge_base"
|
|
|
|
# ─── HTTP helpers ───
|
|
|
|
def api(method, path, body=None, timeout=300):
|
|
data = json.dumps(body).encode() if body else None
|
|
req = Request(f"{BASE}{path}", data=data, method=method,
|
|
headers={"Content-Type": "application/json"})
|
|
try:
|
|
resp = urlopen(req, timeout=timeout)
|
|
raw = resp.read()
|
|
return json.loads(raw) if raw.strip() else {"ok": True}
|
|
except HTTPError as e:
|
|
return {"error": e.read().decode()[:500], "status": e.code}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def post(path, body=None, **kw): return api("POST", path, body, **kw)
|
|
def get(path, **kw): return api("GET", path, **kw)
|
|
def delete(path, **kw): return api("DELETE", path, **kw)
|
|
|
|
# ─── Error pipeline ───
|
|
|
|
class ErrorPipeline:
|
|
"""Captures every error, writes them back to the lakehouse as a
|
|
queryable dataset. Errors become data — the agent reads its own
|
|
failures to decide what to fix next."""
|
|
|
|
def __init__(self):
|
|
self.errors = []
|
|
self.warnings = []
|
|
|
|
def capture(self, phase, operation, error, context=None):
|
|
entry = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"phase": phase,
|
|
"operation": operation,
|
|
"error": str(error)[:500],
|
|
"context": json.dumps(context)[:500] if context else "",
|
|
}
|
|
self.errors.append(entry)
|
|
print(f" ⚠ ERROR [{phase}] {operation}: {str(error)[:100]}")
|
|
|
|
def warn(self, msg):
|
|
self.warnings.append(msg)
|
|
print(f" ⚡ {msg}")
|
|
|
|
def flush_to_lakehouse(self):
|
|
"""Write errors as a CSV-style ingest so they're queryable."""
|
|
if not self.errors:
|
|
return
|
|
# Build a simple CSV in memory
|
|
import io, csv
|
|
buf = io.StringIO()
|
|
writer = csv.DictWriter(buf, fieldnames=["timestamp", "phase", "operation", "error", "context"])
|
|
writer.writeheader()
|
|
writer.writerows(self.errors)
|
|
csv_bytes = buf.getvalue().encode()
|
|
|
|
# Ingest via multipart
|
|
import http.client
|
|
boundary = "----LakehouseErrorBoundary"
|
|
body = (
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="file"; filename="agent_errors.csv"\r\n'
|
|
f"Content-Type: text/csv\r\n\r\n"
|
|
).encode() + csv_bytes + f"\r\n--{boundary}--\r\n".encode()
|
|
|
|
conn = http.client.HTTPConnection("localhost", 3100)
|
|
conn.request("POST", "/ingest/file?name=agent_errors", body=body,
|
|
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"})
|
|
resp = conn.getresponse()
|
|
resp.read()
|
|
conn.close()
|
|
print(f" → Flushed {len(self.errors)} errors to agent_errors dataset")
|
|
|
|
def summary(self):
|
|
return f"{len(self.errors)} errors, {len(self.warnings)} warnings"
|
|
|
|
# ─── Playbook runner ───
|
|
|
|
class PlaybookRunner:
|
|
def __init__(self, errors: ErrorPipeline):
|
|
self.errors = errors
|
|
self.results = []
|
|
self.iteration = 0
|
|
|
|
def record(self, name, passed, detail, ms=None):
|
|
self.results.append({"name": name, "passed": passed, "detail": detail,
|
|
"ms": ms, "iteration": self.iteration})
|
|
icon = "✓" if passed else "✗"
|
|
ms_s = f" ({ms:.0f}ms)" if ms else ""
|
|
print(f" {icon} {name}{ms_s}: {detail}")
|
|
|
|
def run_playbook(self, name, steps):
|
|
"""Run a named playbook — list of (step_name, callable) pairs."""
|
|
print(f"\n{'─'*50}")
|
|
print(f" Playbook: {name} (iteration {self.iteration})")
|
|
print(f"{'─'*50}")
|
|
for step_name, fn in steps:
|
|
try:
|
|
fn()
|
|
except Exception as e:
|
|
self.errors.capture(name, step_name, e)
|
|
self.record(step_name, False, str(e)[:80])
|
|
|
|
# ─── Phase 1: Ingest everything from Postgres ───
|
|
|
|
def phase_ingest(runner: PlaybookRunner, errors: ErrorPipeline):
|
|
tables = [
|
|
"team_runs", "response_cache", "response_cache_history",
|
|
"pipeline_runs", "meta_runs", "meta_pipelines",
|
|
"threat_intel", "memory_entries", "self_reports", "lab_trials",
|
|
]
|
|
|
|
def ingest_all():
|
|
total_rows = 0
|
|
for table in tables:
|
|
t0 = time.time()
|
|
r = post("/ingest/db", {
|
|
"dsn": PG_DSN,
|
|
"table": table,
|
|
"dataset_name": f"kb_{table}",
|
|
})
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("ingest", table, r["error"])
|
|
runner.record(f"ingest {table}", False, r["error"][:60], ms)
|
|
else:
|
|
rows = r.get("rows", 0)
|
|
total_rows += rows
|
|
runner.record(f"ingest {table}", True, f"{rows} rows", ms)
|
|
runner.record("total ingest", total_rows > 500, f"{total_rows} rows across {len(tables)} tables")
|
|
|
|
runner.run_playbook("Phase 1: Postgres Ingest", [("ingest_all_tables", ingest_all)])
|
|
|
|
# ─── Phase 2: Embed text-heavy tables ───
|
|
|
|
def phase_embed(runner: PlaybookRunner, errors: ErrorPipeline):
|
|
# Embed team_runs (prompts + responses are text-rich)
|
|
def embed_team_runs():
|
|
# First build docs from the dataset
|
|
t0 = time.time()
|
|
r = post("/query/sql", {"sql": """
|
|
SELECT id, prompt, mode FROM kb_team_runs LIMIT 200
|
|
"""})
|
|
if "error" in r:
|
|
errors.capture("embed", "fetch_team_runs", r["error"])
|
|
return
|
|
|
|
docs = []
|
|
for row in r.get("rows", []):
|
|
doc_id = f"tr-{row['id']}"
|
|
text = f"[{row.get('mode','')}] {row.get('prompt','')}"
|
|
if len(text) > 20:
|
|
docs.append({"id": doc_id, "text": text})
|
|
|
|
if not docs:
|
|
errors.warn("no docs to embed from team_runs")
|
|
return
|
|
|
|
# Create vector index
|
|
r = post("/vectors/index", {
|
|
"index_name": "kb_team_runs_agent",
|
|
"source": "kb_team_runs",
|
|
"documents": docs,
|
|
"chunk_size": 500,
|
|
"overlap": 50,
|
|
}, timeout=600)
|
|
ms = (time.time() - t0) * 1000
|
|
|
|
if "error" in r:
|
|
errors.capture("embed", "create_index", r["error"])
|
|
runner.record("embed team_runs", False, r["error"][:60], ms)
|
|
else:
|
|
chunks = r.get("chunks", 0)
|
|
runner.record("embed team_runs", True, f"{chunks} chunks, job={r.get('job_id','?')}", ms)
|
|
|
|
# Wait for background embedding job
|
|
job_id = r.get("job_id")
|
|
if job_id:
|
|
print(f" ⏳ waiting for embedding job {job_id}...")
|
|
for _ in range(120): # 10 min max
|
|
time.sleep(5)
|
|
status = get(f"/vectors/jobs/{job_id}")
|
|
state = status.get("status", "unknown")
|
|
progress = status.get("embedded_chunks", 0)
|
|
if state == "completed":
|
|
runner.record("embedding complete", True, f"{progress} chunks embedded")
|
|
break
|
|
elif state == "failed":
|
|
errors.capture("embed", "job_failed", status.get("error", "unknown"))
|
|
runner.record("embedding complete", False, "job failed")
|
|
break
|
|
sys.stdout.write(f"\r ⏳ {state}: {progress} chunks...")
|
|
sys.stdout.flush()
|
|
print()
|
|
|
|
def embed_response_cache():
|
|
t0 = time.time()
|
|
r = post("/query/sql", {"sql": """
|
|
SELECT id, prompt, mode FROM kb_response_cache LIMIT 100
|
|
"""})
|
|
if "error" in r:
|
|
errors.capture("embed", "fetch_response_cache", r["error"])
|
|
return
|
|
|
|
docs = [{"id": f"rc-{row['id']}", "text": f"[{row.get('mode','')}] {row.get('prompt','')}"}
|
|
for row in r.get("rows", []) if len(row.get("prompt", "")) > 20]
|
|
if not docs:
|
|
errors.warn("no docs to embed from response_cache")
|
|
return
|
|
|
|
r = post("/vectors/index", {
|
|
"index_name": "kb_response_cache_agent",
|
|
"source": "kb_response_cache",
|
|
"documents": docs,
|
|
}, timeout=600)
|
|
ms = (time.time() - t0) * 1000
|
|
runner.record("embed response_cache", "error" not in r,
|
|
f"{r.get('chunks',0)} chunks" if "error" not in r else r["error"][:60], ms)
|
|
|
|
runner.run_playbook("Phase 2: Embed Corpus", [
|
|
("embed_team_runs", embed_team_runs),
|
|
("embed_response_cache", embed_response_cache),
|
|
])
|
|
|
|
# ─── Phase 3: Create hot-swap profiles ───
|
|
|
|
def phase_profiles(runner: PlaybookRunner, errors: ErrorPipeline):
|
|
profiles = [
|
|
{
|
|
"id": "agent-parquet",
|
|
"ollama_name": "qwen2.5:latest",
|
|
"description": "Agent stress-test profile (Parquet+HNSW backend)",
|
|
"bound_datasets": ["kb_team_runs", "kb_response_cache"],
|
|
"vector_backend": "parquet",
|
|
},
|
|
{
|
|
"id": "agent-lance",
|
|
"ollama_name": "mistral:latest",
|
|
"description": "Agent stress-test profile (Lance backend)",
|
|
"bound_datasets": ["kb_team_runs", "kb_response_cache"],
|
|
"vector_backend": "lance",
|
|
},
|
|
]
|
|
|
|
def create_profiles():
|
|
for p in profiles:
|
|
# Delete if exists (idempotent)
|
|
delete(f"/catalog/profiles/{p['id']}")
|
|
t0 = time.time()
|
|
r = post("/catalog/profiles", p)
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("profiles", f"create {p['id']}", r["error"])
|
|
runner.record(f"create {p['id']}", False, r["error"][:60], ms)
|
|
else:
|
|
runner.record(f"create {p['id']}", True,
|
|
f"backend={p['vector_backend']} model={p['ollama_name']}", ms)
|
|
|
|
def activate_parquet():
|
|
t0 = time.time()
|
|
r = post("/vectors/profile/agent-parquet/activate", timeout=600)
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("profiles", "activate_parquet", r["error"])
|
|
runner.record("activate parquet profile", False, r["error"][:60], ms)
|
|
else:
|
|
warmed = len(r.get("indexes_warmed", []))
|
|
vecs = r.get("total_vectors", 0)
|
|
runner.record("activate parquet profile", True,
|
|
f"warmed={warmed} vectors={vecs} preloaded={r.get('model_preloaded')}", ms)
|
|
|
|
def activate_lance():
|
|
t0 = time.time()
|
|
r = post("/vectors/profile/agent-lance/activate", timeout=600)
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("profiles", "activate_lance", r["error"])
|
|
runner.record("activate lance profile", False, r["error"][:60], ms)
|
|
else:
|
|
warmed = len(r.get("indexes_warmed", []))
|
|
vecs = r.get("total_vectors", 0)
|
|
runner.record("activate lance profile", True,
|
|
f"warmed={warmed} vectors={vecs} model_swap={r.get('previous_profile')}", ms)
|
|
|
|
runner.run_playbook("Phase 3: Hot-Swap Profiles", [
|
|
("create_profiles", create_profiles),
|
|
("activate_parquet", activate_parquet),
|
|
("activate_lance", activate_lance),
|
|
])
|
|
|
|
# ─── Phase 4: Stress queries across backends ───
|
|
|
|
def phase_stress(runner: PlaybookRunner, errors: ErrorPipeline):
|
|
queries = [
|
|
"Which models performed best on complex prompts?",
|
|
"Find pipeline runs that failed or had errors",
|
|
"What types of prompts get the highest quality scores?",
|
|
"Show me threat intelligence related to authentication attacks",
|
|
"Which adaptive pipeline runs had the most model escalations?",
|
|
]
|
|
|
|
def stress_sql():
|
|
sqls = [
|
|
("KB dataset count", "SELECT COUNT(*) FROM kb_team_runs"),
|
|
("Score distribution", "SELECT mode, AVG(quality_score) avg_score, COUNT(*) cnt FROM kb_team_runs GROUP BY mode ORDER BY avg_score DESC LIMIT 10"),
|
|
("Pipeline status", "SELECT status, COUNT(*) FROM kb_pipeline_runs GROUP BY status"),
|
|
("Cache hit rates", "SELECT SUM(hit_count) total_hits, COUNT(*) entries FROM kb_response_cache"),
|
|
]
|
|
for name, sql in sqls:
|
|
t0 = time.time()
|
|
r = post("/query/sql", {"sql": sql})
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("stress", name, r["error"])
|
|
runner.record(name, False, r["error"][:60], ms)
|
|
else:
|
|
runner.record(name, True, f"{r.get('row_count',0)} rows", ms)
|
|
|
|
def stress_vector_parquet():
|
|
"""Search through the Parquet profile."""
|
|
post("/vectors/profile/agent-parquet/activate", timeout=300)
|
|
for i, q in enumerate(queries[:3]):
|
|
t0 = time.time()
|
|
r = post("/vectors/profile/agent-parquet/search", {
|
|
"index_name": "kb_team_runs_agent",
|
|
"query": q,
|
|
"top_k": 3,
|
|
})
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("stress", f"parquet_search_{i}", r["error"])
|
|
runner.record(f"parquet search: {q[:40]}", False, r["error"][:60], ms)
|
|
else:
|
|
method = r.get("method", "?")
|
|
hits = len(r.get("results", []))
|
|
runner.record(f"parquet search: {q[:40]}", True, f"{method} {hits} hits", ms)
|
|
|
|
def stress_vector_lance():
|
|
"""Hot-swap to Lance, run same queries."""
|
|
post("/vectors/profile/agent-lance/activate", timeout=300)
|
|
vram = get("/ai/vram")
|
|
loaded = [m["name"] for m in vram.get("ollama_loaded", [])] if isinstance(vram, dict) else []
|
|
runner.record("VRAM after swap", True, f"models={loaded}")
|
|
|
|
for i, q in enumerate(queries[:3]):
|
|
t0 = time.time()
|
|
r = post("/vectors/profile/agent-lance/search", {
|
|
"index_name": "kb_team_runs_agent",
|
|
"query": q,
|
|
"top_k": 3,
|
|
})
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("stress", f"lance_search_{i}", r["error"])
|
|
runner.record(f"lance search: {q[:40]}", False, r["error"][:60], ms)
|
|
else:
|
|
method = r.get("method", "?")
|
|
hits = len(r.get("results", []))
|
|
runner.record(f"lance search: {q[:40]}", True, f"{method} {hits} hits", ms)
|
|
|
|
def stress_rag():
|
|
"""Full RAG pipeline — question → embed → search → generate."""
|
|
for q in queries[:2]:
|
|
t0 = time.time()
|
|
r = post("/vectors/rag", {
|
|
"index_name": "kb_team_runs_agent",
|
|
"question": q,
|
|
"top_k": 3,
|
|
}, timeout=120)
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" in r:
|
|
errors.capture("stress", f"rag", r["error"])
|
|
runner.record(f"RAG: {q[:40]}", False, r["error"][:60], ms)
|
|
else:
|
|
answer = r.get("answer", "")
|
|
sources = len(r.get("sources", []))
|
|
runner.record(f"RAG: {q[:40]}", len(answer) > 10,
|
|
f"{len(answer)} chars, {sources} sources", ms)
|
|
|
|
runner.run_playbook("Phase 4: Stress Test", [
|
|
("sql_queries", stress_sql),
|
|
("vector_parquet", stress_vector_parquet),
|
|
("vector_lance", stress_vector_lance),
|
|
("rag_pipeline", stress_rag),
|
|
])
|
|
|
|
# ─── Phase 5: Recursive iteration ───
|
|
|
|
def phase_iterate(runner: PlaybookRunner, errors: ErrorPipeline):
|
|
"""Read the error pipeline, generate new test scenarios targeting
|
|
the weak spots, run them. This is the recursive playbook."""
|
|
|
|
def analyze_errors():
|
|
if not errors.errors:
|
|
runner.record("error analysis", True, "no errors to iterate on — clean run!")
|
|
return
|
|
|
|
# Group errors by phase
|
|
by_phase = {}
|
|
for e in errors.errors:
|
|
by_phase.setdefault(e["phase"], []).append(e)
|
|
|
|
for phase, errs in by_phase.items():
|
|
runner.record(f"errors in {phase}", False, f"{len(errs)} failures")
|
|
|
|
# Ask the LLM to analyze the error patterns
|
|
error_summary = "\n".join(
|
|
f"- [{e['phase']}] {e['operation']}: {e['error'][:100]}"
|
|
for e in errors.errors[:10]
|
|
)
|
|
t0 = time.time()
|
|
r = post("/ai/generate", {
|
|
"prompt": f"""Analyze these Lakehouse system errors and suggest what to fix:
|
|
|
|
{error_summary}
|
|
|
|
Be specific: name the endpoint, the likely root cause, and a concrete fix. Keep it under 200 words.""",
|
|
"model": "qwen2.5",
|
|
"max_tokens": 500,
|
|
})
|
|
ms = (time.time() - t0) * 1000
|
|
if "error" not in r:
|
|
analysis = r.get("text", "")
|
|
runner.record("LLM error analysis", True, f"{len(analysis)} chars", ms)
|
|
print(f"\n ┌─ LLM Analysis ─────────────────────────")
|
|
for line in analysis.strip().split("\n")[:10]:
|
|
print(f" │ {line}")
|
|
print(f" └────────────────────────────────────────\n")
|
|
|
|
def retry_failures():
|
|
"""Re-run operations that failed, to see if they were transient."""
|
|
if not errors.errors:
|
|
return
|
|
retried = 0
|
|
fixed = 0
|
|
for e in errors.errors[:5]:
|
|
if e["phase"] == "stress" and "search" in e["operation"]:
|
|
retried += 1
|
|
r = post("/vectors/hnsw/search", {
|
|
"index_name": "kb_team_runs_agent",
|
|
"query": "test retry",
|
|
"top_k": 2,
|
|
})
|
|
if "error" not in r:
|
|
fixed += 1
|
|
if retried:
|
|
runner.record(f"retry failures", fixed > 0,
|
|
f"{fixed}/{retried} fixed on retry")
|
|
|
|
runner.run_playbook("Phase 5: Recursive Iteration", [
|
|
("analyze_errors", analyze_errors),
|
|
("retry_failures", retry_failures),
|
|
])
|
|
|
|
# ─── Main ───
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("AUTONOMOUS LAKEHOUSE AGENT")
|
|
print(f"Started: {datetime.utcnow().isoformat()}")
|
|
print(f"Target: {BASE}")
|
|
print(f"Postgres: {PG_DSN}")
|
|
print("=" * 60)
|
|
|
|
errors = ErrorPipeline()
|
|
runner = PlaybookRunner(errors)
|
|
|
|
# Iteration loop
|
|
for iteration in range(2): # 2 iterations for now
|
|
runner.iteration = iteration
|
|
print(f"\n{'═'*60}")
|
|
print(f" ITERATION {iteration}")
|
|
print(f"{'═'*60}")
|
|
|
|
if iteration == 0:
|
|
phase_ingest(runner, errors)
|
|
phase_embed(runner, errors)
|
|
|
|
phase_profiles(runner, errors)
|
|
phase_stress(runner, errors)
|
|
phase_iterate(runner, errors)
|
|
|
|
# Flush errors to the lakehouse
|
|
errors.flush_to_lakehouse()
|
|
|
|
# Final scorecard
|
|
print(f"\n{'═'*60}")
|
|
print(f" FINAL SCORECARD")
|
|
print(f"{'═'*60}")
|
|
passed = sum(1 for r in runner.results if r["passed"])
|
|
total = len(runner.results)
|
|
pct = passed / total * 100 if total else 0
|
|
print(f"\n {passed}/{total} passed ({pct:.0f}%)")
|
|
print(f" Error pipeline: {errors.summary()}")
|
|
print(f"\n {'Test':<50} {'ms':>8} {'Iter':>4} {'Status':>6}")
|
|
print(f" {'-'*72}")
|
|
for r in runner.results:
|
|
ms = f"{r['ms']:.0f}" if r['ms'] else "—"
|
|
status = "PASS" if r["passed"] else "FAIL"
|
|
print(f" {r['name']:<50} {ms:>8} {r['iteration']:>4} {status:>6}")
|
|
|
|
failed = [r for r in runner.results if not r["passed"]]
|
|
if failed:
|
|
print(f"\n FAILURES ({len(failed)}):")
|
|
for r in failed:
|
|
print(f" [{r['iteration']}] {r['name']}: {r['detail'][:80]}")
|
|
|
|
return 0 if pct >= 80 else 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|