#!/usr/bin/env python3 """Autonomous stress-test agent for the Lakehouse substrate. Ingests the full Postgres knowledge_base, embeds it, creates profiles for both Parquet+HNSW and Lance backends, runs recursive playbooks that test every capability, captures errors, hot-swaps profiles to compare model/backend configurations, and iterates. Designed to run unattended. All state goes through the Lakehouse API so the agent IS a real consumer of the substrate. """ import json, time, sys, os, traceback from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError BASE = "http://localhost:3100" PG_DSN = "postgresql://kbuser@localhost:5432/knowledge_base" # ─── HTTP helpers ─── def api(method, path, body=None, timeout=300): data = json.dumps(body).encode() if body else None req = Request(f"{BASE}{path}", data=data, method=method, headers={"Content-Type": "application/json"}) try: resp = urlopen(req, timeout=timeout) raw = resp.read() return json.loads(raw) if raw.strip() else {"ok": True} except HTTPError as e: return {"error": e.read().decode()[:500], "status": e.code} except Exception as e: return {"error": str(e)} def post(path, body=None, **kw): return api("POST", path, body, **kw) def get(path, **kw): return api("GET", path, **kw) def delete(path, **kw): return api("DELETE", path, **kw) # ─── Error pipeline ─── class ErrorPipeline: """Captures every error, writes them back to the lakehouse as a queryable dataset. Errors become data — the agent reads its own failures to decide what to fix next.""" def __init__(self): self.errors = [] self.warnings = [] def capture(self, phase, operation, error, context=None): entry = { "timestamp": datetime.utcnow().isoformat(), "phase": phase, "operation": operation, "error": str(error)[:500], "context": json.dumps(context)[:500] if context else "", } self.errors.append(entry) print(f" ⚠ ERROR [{phase}] {operation}: {str(error)[:100]}") def warn(self, msg): self.warnings.append(msg) print(f" ⚡ {msg}") def flush_to_lakehouse(self): """Write errors as a CSV-style ingest so they're queryable.""" if not self.errors: return # Build a simple CSV in memory import io, csv buf = io.StringIO() writer = csv.DictWriter(buf, fieldnames=["timestamp", "phase", "operation", "error", "context"]) writer.writeheader() writer.writerows(self.errors) csv_bytes = buf.getvalue().encode() # Ingest via multipart import http.client boundary = "----LakehouseErrorBoundary" body = ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="agent_errors.csv"\r\n' f"Content-Type: text/csv\r\n\r\n" ).encode() + csv_bytes + f"\r\n--{boundary}--\r\n".encode() conn = http.client.HTTPConnection("localhost", 3100) conn.request("POST", "/ingest/file?name=agent_errors", body=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}) resp = conn.getresponse() resp.read() conn.close() print(f" → Flushed {len(self.errors)} errors to agent_errors dataset") def summary(self): return f"{len(self.errors)} errors, {len(self.warnings)} warnings" # ─── Playbook runner ─── class PlaybookRunner: def __init__(self, errors: ErrorPipeline): self.errors = errors self.results = [] self.iteration = 0 def record(self, name, passed, detail, ms=None): self.results.append({"name": name, "passed": passed, "detail": detail, "ms": ms, "iteration": self.iteration}) icon = "✓" if passed else "✗" ms_s = f" ({ms:.0f}ms)" if ms else "" print(f" {icon} {name}{ms_s}: {detail}") def run_playbook(self, name, steps): """Run a named playbook — list of (step_name, callable) pairs.""" print(f"\n{'─'*50}") print(f" Playbook: {name} (iteration {self.iteration})") print(f"{'─'*50}") for step_name, fn in steps: try: fn() except Exception as e: self.errors.capture(name, step_name, e) self.record(step_name, False, str(e)[:80]) # ─── Phase 1: Ingest everything from Postgres ─── def phase_ingest(runner: PlaybookRunner, errors: ErrorPipeline): tables = [ "team_runs", "response_cache", "response_cache_history", "pipeline_runs", "meta_runs", "meta_pipelines", "threat_intel", "memory_entries", "self_reports", "lab_trials", ] def ingest_all(): total_rows = 0 for table in tables: t0 = time.time() r = post("/ingest/db", { "dsn": PG_DSN, "table": table, "dataset_name": f"kb_{table}", }) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("ingest", table, r["error"]) runner.record(f"ingest {table}", False, r["error"][:60], ms) else: rows = r.get("rows", 0) total_rows += rows runner.record(f"ingest {table}", True, f"{rows} rows", ms) runner.record("total ingest", total_rows > 500, f"{total_rows} rows across {len(tables)} tables") runner.run_playbook("Phase 1: Postgres Ingest", [("ingest_all_tables", ingest_all)]) # ─── Phase 2: Embed text-heavy tables ─── def phase_embed(runner: PlaybookRunner, errors: ErrorPipeline): # Embed team_runs (prompts + responses are text-rich) def embed_team_runs(): # First build docs from the dataset t0 = time.time() r = post("/query/sql", {"sql": """ SELECT id, prompt, mode FROM kb_team_runs LIMIT 200 """}) if "error" in r: errors.capture("embed", "fetch_team_runs", r["error"]) return docs = [] for row in r.get("rows", []): doc_id = f"tr-{row['id']}" text = f"[{row.get('mode','')}] {row.get('prompt','')}" if len(text) > 20: docs.append({"id": doc_id, "text": text}) if not docs: errors.warn("no docs to embed from team_runs") return # Create vector index r = post("/vectors/index", { "index_name": "kb_team_runs_agent", "source": "kb_team_runs", "documents": docs, "chunk_size": 500, "overlap": 50, }, timeout=600) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("embed", "create_index", r["error"]) runner.record("embed team_runs", False, r["error"][:60], ms) else: chunks = r.get("chunks", 0) runner.record("embed team_runs", True, f"{chunks} chunks, job={r.get('job_id','?')}", ms) # Wait for background embedding job job_id = r.get("job_id") if job_id: print(f" ⏳ waiting for embedding job {job_id}...") for _ in range(120): # 10 min max time.sleep(5) status = get(f"/vectors/jobs/{job_id}") state = status.get("status", "unknown") progress = status.get("processed", 0) if state == "completed": runner.record("embedding complete", True, f"{progress} chunks embedded") break elif state == "failed": errors.capture("embed", "job_failed", status.get("error", "unknown")) runner.record("embedding complete", False, "job failed") break sys.stdout.write(f"\r ⏳ {state}: {progress} chunks...") sys.stdout.flush() print() def embed_response_cache(): t0 = time.time() r = post("/query/sql", {"sql": """ SELECT id, prompt, mode FROM kb_response_cache LIMIT 100 """}) if "error" in r: errors.capture("embed", "fetch_response_cache", r["error"]) return docs = [{"id": f"rc-{row['id']}", "text": f"[{row.get('mode','')}] {row.get('prompt','')}"} for row in r.get("rows", []) if len(row.get("prompt", "")) > 20] if not docs: errors.warn("no docs to embed from response_cache") return r = post("/vectors/index", { "index_name": "kb_response_cache_agent", "source": "kb_response_cache", "documents": docs, }, timeout=600) ms = (time.time() - t0) * 1000 runner.record("embed response_cache", "error" not in r, f"{r.get('chunks',0)} chunks" if "error" not in r else r["error"][:60], ms) runner.run_playbook("Phase 2: Embed Corpus", [ ("embed_team_runs", embed_team_runs), ("embed_response_cache", embed_response_cache), ]) # ─── Phase 3: Create hot-swap profiles ─── def phase_profiles(runner: PlaybookRunner, errors: ErrorPipeline): profiles = [ { "id": "agent-parquet", "ollama_name": "qwen2.5:latest", "description": "Agent stress-test profile (Parquet+HNSW backend)", "bound_datasets": ["kb_team_runs", "kb_response_cache"], "vector_backend": "parquet", }, { "id": "agent-lance", "ollama_name": "mistral:latest", "description": "Agent stress-test profile (Lance backend)", "bound_datasets": ["kb_team_runs", "kb_response_cache"], "vector_backend": "lance", }, ] def create_profiles(): for p in profiles: # Delete if exists (idempotent) delete(f"/catalog/profiles/{p['id']}") t0 = time.time() r = post("/catalog/profiles", p) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("profiles", f"create {p['id']}", r["error"]) runner.record(f"create {p['id']}", False, r["error"][:60], ms) else: runner.record(f"create {p['id']}", True, f"backend={p['vector_backend']} model={p['ollama_name']}", ms) def activate_parquet(): t0 = time.time() r = post("/vectors/profile/agent-parquet/activate", timeout=600) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("profiles", "activate_parquet", r["error"]) runner.record("activate parquet profile", False, r["error"][:60], ms) else: warmed = len(r.get("indexes_warmed", [])) vecs = r.get("total_vectors", 0) runner.record("activate parquet profile", True, f"warmed={warmed} vectors={vecs} preloaded={r.get('model_preloaded')}", ms) def activate_lance(): t0 = time.time() r = post("/vectors/profile/agent-lance/activate", timeout=600) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("profiles", "activate_lance", r["error"]) runner.record("activate lance profile", False, r["error"][:60], ms) else: warmed = len(r.get("indexes_warmed", [])) vecs = r.get("total_vectors", 0) runner.record("activate lance profile", True, f"warmed={warmed} vectors={vecs} model_swap={r.get('previous_profile')}", ms) runner.run_playbook("Phase 3: Hot-Swap Profiles", [ ("create_profiles", create_profiles), ("activate_parquet", activate_parquet), ("activate_lance", activate_lance), ]) # ─── Phase 4: Stress queries across backends ─── def phase_stress(runner: PlaybookRunner, errors: ErrorPipeline): queries = [ "Which models performed best on complex prompts?", "Find pipeline runs that failed or had errors", "What types of prompts get the highest quality scores?", "Show me threat intelligence related to authentication attacks", "Which adaptive pipeline runs had the most model escalations?", ] def stress_sql(): sqls = [ ("KB dataset count", "SELECT COUNT(*) FROM kb_team_runs"), ("Score distribution", "SELECT mode, AVG(quality_score) avg_score, COUNT(*) cnt FROM kb_team_runs GROUP BY mode ORDER BY avg_score DESC LIMIT 10"), ("Pipeline status", "SELECT status, COUNT(*) FROM kb_pipeline_runs GROUP BY status"), ("Cache hit rates", "SELECT SUM(hit_count) total_hits, COUNT(*) entries FROM kb_response_cache"), ] for name, sql in sqls: t0 = time.time() r = post("/query/sql", {"sql": sql}) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("stress", name, r["error"]) runner.record(name, False, r["error"][:60], ms) else: runner.record(name, True, f"{r.get('row_count',0)} rows", ms) def stress_vector_parquet(): """Search through the Parquet profile.""" post("/vectors/profile/agent-parquet/activate", timeout=300) for i, q in enumerate(queries[:3]): t0 = time.time() r = post("/vectors/profile/agent-parquet/search", { "index_name": "kb_team_runs_agent", "query": q, "top_k": 3, }) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("stress", f"parquet_search_{i}", r["error"]) runner.record(f"parquet search: {q[:40]}", False, r["error"][:60], ms) else: method = r.get("method", "?") hits = len(r.get("results", [])) runner.record(f"parquet search: {q[:40]}", True, f"{method} {hits} hits", ms) def stress_vector_lance(): """Hot-swap to Lance, run same queries.""" post("/vectors/profile/agent-lance/activate", timeout=300) vram = get("/ai/vram") loaded = [m["name"] for m in vram.get("ollama_loaded", [])] if isinstance(vram, dict) else [] runner.record("VRAM after swap", True, f"models={loaded}") for i, q in enumerate(queries[:3]): t0 = time.time() r = post("/vectors/profile/agent-lance/search", { "index_name": "kb_team_runs_agent", "query": q, "top_k": 3, }) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("stress", f"lance_search_{i}", r["error"]) runner.record(f"lance search: {q[:40]}", False, r["error"][:60], ms) else: method = r.get("method", "?") hits = len(r.get("results", [])) runner.record(f"lance search: {q[:40]}", True, f"{method} {hits} hits", ms) def stress_rag(): """Full RAG pipeline — question → embed → search → generate.""" for q in queries[:2]: t0 = time.time() r = post("/vectors/rag", { "index_name": "kb_team_runs_agent", "question": q, "top_k": 3, }, timeout=120) ms = (time.time() - t0) * 1000 if "error" in r: errors.capture("stress", f"rag", r["error"]) runner.record(f"RAG: {q[:40]}", False, r["error"][:60], ms) else: answer = r.get("answer", "") sources = len(r.get("sources", [])) runner.record(f"RAG: {q[:40]}", len(answer) > 10, f"{len(answer)} chars, {sources} sources", ms) runner.run_playbook("Phase 4: Stress Test", [ ("sql_queries", stress_sql), ("vector_parquet", stress_vector_parquet), ("vector_lance", stress_vector_lance), ("rag_pipeline", stress_rag), ]) # ─── Phase 5: Recursive iteration ─── def phase_iterate(runner: PlaybookRunner, errors: ErrorPipeline): """Read the error pipeline, generate new test scenarios targeting the weak spots, run them. This is the recursive playbook.""" def analyze_errors(): if not errors.errors: runner.record("error analysis", True, "no errors to iterate on — clean run!") return # Group errors by phase by_phase = {} for e in errors.errors: by_phase.setdefault(e["phase"], []).append(e) for phase, errs in by_phase.items(): runner.record(f"errors in {phase}", False, f"{len(errs)} failures") # Ask the LLM to analyze the error patterns error_summary = "\n".join( f"- [{e['phase']}] {e['operation']}: {e['error'][:100]}" for e in errors.errors[:10] ) t0 = time.time() r = post("/ai/generate", { "prompt": f"""Analyze these Lakehouse system errors and suggest what to fix: {error_summary} Be specific: name the endpoint, the likely root cause, and a concrete fix. Keep it under 200 words.""", "model": "qwen2.5", "max_tokens": 500, }) ms = (time.time() - t0) * 1000 if "error" not in r: analysis = r.get("text", "") runner.record("LLM error analysis", True, f"{len(analysis)} chars", ms) print(f"\n ┌─ LLM Analysis ─────────────────────────") for line in analysis.strip().split("\n")[:10]: print(f" │ {line}") print(f" └────────────────────────────────────────\n") def retry_failures(): """Re-run operations that failed, to see if they were transient.""" if not errors.errors: return retried = 0 fixed = 0 for e in errors.errors[:5]: if e["phase"] == "stress" and "search" in e["operation"]: retried += 1 r = post("/vectors/hnsw/search", { "index_name": "kb_team_runs_agent", "query": "test retry", "top_k": 2, }) if "error" not in r: fixed += 1 if retried: runner.record(f"retry failures", fixed > 0, f"{fixed}/{retried} fixed on retry") runner.run_playbook("Phase 5: Recursive Iteration", [ ("analyze_errors", analyze_errors), ("retry_failures", retry_failures), ]) # ─── Main ─── def main(): print("=" * 60) print("AUTONOMOUS LAKEHOUSE AGENT") print(f"Started: {datetime.utcnow().isoformat()}") print(f"Target: {BASE}") print(f"Postgres: {PG_DSN}") print("=" * 60) errors = ErrorPipeline() runner = PlaybookRunner(errors) # Iteration loop for iteration in range(2): # 2 iterations for now runner.iteration = iteration print(f"\n{'═'*60}") print(f" ITERATION {iteration}") print(f"{'═'*60}") if iteration == 0: phase_ingest(runner, errors) phase_embed(runner, errors) phase_profiles(runner, errors) phase_stress(runner, errors) phase_iterate(runner, errors) # Flush errors to the lakehouse errors.flush_to_lakehouse() # Final scorecard print(f"\n{'═'*60}") print(f" FINAL SCORECARD") print(f"{'═'*60}") passed = sum(1 for r in runner.results if r["passed"]) total = len(runner.results) pct = passed / total * 100 if total else 0 print(f"\n {passed}/{total} passed ({pct:.0f}%)") print(f" Error pipeline: {errors.summary()}") print(f"\n {'Test':<50} {'ms':>8} {'Iter':>4} {'Status':>6}") print(f" {'-'*72}") for r in runner.results: ms = f"{r['ms']:.0f}" if r['ms'] else "—" status = "PASS" if r["passed"] else "FAIL" print(f" {r['name']:<50} {ms:>8} {r['iteration']:>4} {status:>6}") failed = [r for r in runner.results if not r["passed"]] if failed: print(f"\n FAILURES ({len(failed)}):") for r in failed: print(f" [{r['iteration']}] {r['name']}: {r['detail'][:80]}") return 0 if pct >= 80 else 1 if __name__ == "__main__": sys.exit(main())