matrix-agent-validated/scripts/autonomous_agent.py
profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

533 lines
20 KiB
Python

#!/usr/bin/env python3
"""Autonomous stress-test agent for the Lakehouse substrate.
Ingests the full Postgres knowledge_base, embeds it, creates profiles
for both Parquet+HNSW and Lance backends, runs recursive playbooks
that test every capability, captures errors, hot-swaps profiles to
compare model/backend configurations, and iterates.
Designed to run unattended. All state goes through the Lakehouse API
so the agent IS a real consumer of the substrate.
"""
import json, time, sys, os, traceback
from datetime import datetime
from urllib.request import Request, urlopen
from urllib.error import HTTPError
BASE = "http://localhost:3100"
PG_DSN = "postgresql://kbuser@localhost:5432/knowledge_base"
# ─── HTTP helpers ───
def api(method, path, body=None, timeout=300):
data = json.dumps(body).encode() if body else None
req = Request(f"{BASE}{path}", data=data, method=method,
headers={"Content-Type": "application/json"})
try:
resp = urlopen(req, timeout=timeout)
raw = resp.read()
return json.loads(raw) if raw.strip() else {"ok": True}
except HTTPError as e:
return {"error": e.read().decode()[:500], "status": e.code}
except Exception as e:
return {"error": str(e)}
def post(path, body=None, **kw): return api("POST", path, body, **kw)
def get(path, **kw): return api("GET", path, **kw)
def delete(path, **kw): return api("DELETE", path, **kw)
# ─── Error pipeline ───
class ErrorPipeline:
"""Captures every error, writes them back to the lakehouse as a
queryable dataset. Errors become data — the agent reads its own
failures to decide what to fix next."""
def __init__(self):
self.errors = []
self.warnings = []
def capture(self, phase, operation, error, context=None):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"phase": phase,
"operation": operation,
"error": str(error)[:500],
"context": json.dumps(context)[:500] if context else "",
}
self.errors.append(entry)
print(f" ⚠ ERROR [{phase}] {operation}: {str(error)[:100]}")
def warn(self, msg):
self.warnings.append(msg)
print(f"{msg}")
def flush_to_lakehouse(self):
"""Write errors as a CSV-style ingest so they're queryable."""
if not self.errors:
return
# Build a simple CSV in memory
import io, csv
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=["timestamp", "phase", "operation", "error", "context"])
writer.writeheader()
writer.writerows(self.errors)
csv_bytes = buf.getvalue().encode()
# Ingest via multipart
import http.client
boundary = "----LakehouseErrorBoundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="agent_errors.csv"\r\n'
f"Content-Type: text/csv\r\n\r\n"
).encode() + csv_bytes + f"\r\n--{boundary}--\r\n".encode()
conn = http.client.HTTPConnection("localhost", 3100)
conn.request("POST", "/ingest/file?name=agent_errors", body=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"})
resp = conn.getresponse()
resp.read()
conn.close()
print(f" → Flushed {len(self.errors)} errors to agent_errors dataset")
def summary(self):
return f"{len(self.errors)} errors, {len(self.warnings)} warnings"
# ─── Playbook runner ───
class PlaybookRunner:
def __init__(self, errors: ErrorPipeline):
self.errors = errors
self.results = []
self.iteration = 0
def record(self, name, passed, detail, ms=None):
self.results.append({"name": name, "passed": passed, "detail": detail,
"ms": ms, "iteration": self.iteration})
icon = "" if passed else ""
ms_s = f" ({ms:.0f}ms)" if ms else ""
print(f" {icon} {name}{ms_s}: {detail}")
def run_playbook(self, name, steps):
"""Run a named playbook — list of (step_name, callable) pairs."""
print(f"\n{''*50}")
print(f" Playbook: {name} (iteration {self.iteration})")
print(f"{''*50}")
for step_name, fn in steps:
try:
fn()
except Exception as e:
self.errors.capture(name, step_name, e)
self.record(step_name, False, str(e)[:80])
# ─── Phase 1: Ingest everything from Postgres ───
def phase_ingest(runner: PlaybookRunner, errors: ErrorPipeline):
tables = [
"team_runs", "response_cache", "response_cache_history",
"pipeline_runs", "meta_runs", "meta_pipelines",
"threat_intel", "memory_entries", "self_reports", "lab_trials",
]
def ingest_all():
total_rows = 0
for table in tables:
t0 = time.time()
r = post("/ingest/db", {
"dsn": PG_DSN,
"table": table,
"dataset_name": f"kb_{table}",
})
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("ingest", table, r["error"])
runner.record(f"ingest {table}", False, r["error"][:60], ms)
else:
rows = r.get("rows", 0)
total_rows += rows
runner.record(f"ingest {table}", True, f"{rows} rows", ms)
runner.record("total ingest", total_rows > 500, f"{total_rows} rows across {len(tables)} tables")
runner.run_playbook("Phase 1: Postgres Ingest", [("ingest_all_tables", ingest_all)])
# ─── Phase 2: Embed text-heavy tables ───
def phase_embed(runner: PlaybookRunner, errors: ErrorPipeline):
# Embed team_runs (prompts + responses are text-rich)
def embed_team_runs():
# First build docs from the dataset
t0 = time.time()
r = post("/query/sql", {"sql": """
SELECT id, prompt, mode FROM kb_team_runs LIMIT 200
"""})
if "error" in r:
errors.capture("embed", "fetch_team_runs", r["error"])
return
docs = []
for row in r.get("rows", []):
doc_id = f"tr-{row['id']}"
text = f"[{row.get('mode','')}] {row.get('prompt','')}"
if len(text) > 20:
docs.append({"id": doc_id, "text": text})
if not docs:
errors.warn("no docs to embed from team_runs")
return
# Create vector index
r = post("/vectors/index", {
"index_name": "kb_team_runs_agent",
"source": "kb_team_runs",
"documents": docs,
"chunk_size": 500,
"overlap": 50,
}, timeout=600)
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("embed", "create_index", r["error"])
runner.record("embed team_runs", False, r["error"][:60], ms)
else:
chunks = r.get("chunks", 0)
runner.record("embed team_runs", True, f"{chunks} chunks, job={r.get('job_id','?')}", ms)
# Wait for background embedding job
job_id = r.get("job_id")
if job_id:
print(f" ⏳ waiting for embedding job {job_id}...")
for _ in range(120): # 10 min max
time.sleep(5)
status = get(f"/vectors/jobs/{job_id}")
state = status.get("status", "unknown")
progress = status.get("embedded_chunks", 0)
if state == "completed":
runner.record("embedding complete", True, f"{progress} chunks embedded")
break
elif state == "failed":
errors.capture("embed", "job_failed", status.get("error", "unknown"))
runner.record("embedding complete", False, "job failed")
break
sys.stdout.write(f"\r{state}: {progress} chunks...")
sys.stdout.flush()
print()
def embed_response_cache():
t0 = time.time()
r = post("/query/sql", {"sql": """
SELECT id, prompt, mode FROM kb_response_cache LIMIT 100
"""})
if "error" in r:
errors.capture("embed", "fetch_response_cache", r["error"])
return
docs = [{"id": f"rc-{row['id']}", "text": f"[{row.get('mode','')}] {row.get('prompt','')}"}
for row in r.get("rows", []) if len(row.get("prompt", "")) > 20]
if not docs:
errors.warn("no docs to embed from response_cache")
return
r = post("/vectors/index", {
"index_name": "kb_response_cache_agent",
"source": "kb_response_cache",
"documents": docs,
}, timeout=600)
ms = (time.time() - t0) * 1000
runner.record("embed response_cache", "error" not in r,
f"{r.get('chunks',0)} chunks" if "error" not in r else r["error"][:60], ms)
runner.run_playbook("Phase 2: Embed Corpus", [
("embed_team_runs", embed_team_runs),
("embed_response_cache", embed_response_cache),
])
# ─── Phase 3: Create hot-swap profiles ───
def phase_profiles(runner: PlaybookRunner, errors: ErrorPipeline):
profiles = [
{
"id": "agent-parquet",
"ollama_name": "qwen2.5:latest",
"description": "Agent stress-test profile (Parquet+HNSW backend)",
"bound_datasets": ["kb_team_runs", "kb_response_cache"],
"vector_backend": "parquet",
},
{
"id": "agent-lance",
"ollama_name": "mistral:latest",
"description": "Agent stress-test profile (Lance backend)",
"bound_datasets": ["kb_team_runs", "kb_response_cache"],
"vector_backend": "lance",
},
]
def create_profiles():
for p in profiles:
# Delete if exists (idempotent)
delete(f"/catalog/profiles/{p['id']}")
t0 = time.time()
r = post("/catalog/profiles", p)
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("profiles", f"create {p['id']}", r["error"])
runner.record(f"create {p['id']}", False, r["error"][:60], ms)
else:
runner.record(f"create {p['id']}", True,
f"backend={p['vector_backend']} model={p['ollama_name']}", ms)
def activate_parquet():
t0 = time.time()
r = post("/vectors/profile/agent-parquet/activate", timeout=600)
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("profiles", "activate_parquet", r["error"])
runner.record("activate parquet profile", False, r["error"][:60], ms)
else:
warmed = len(r.get("indexes_warmed", []))
vecs = r.get("total_vectors", 0)
runner.record("activate parquet profile", True,
f"warmed={warmed} vectors={vecs} preloaded={r.get('model_preloaded')}", ms)
def activate_lance():
t0 = time.time()
r = post("/vectors/profile/agent-lance/activate", timeout=600)
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("profiles", "activate_lance", r["error"])
runner.record("activate lance profile", False, r["error"][:60], ms)
else:
warmed = len(r.get("indexes_warmed", []))
vecs = r.get("total_vectors", 0)
runner.record("activate lance profile", True,
f"warmed={warmed} vectors={vecs} model_swap={r.get('previous_profile')}", ms)
runner.run_playbook("Phase 3: Hot-Swap Profiles", [
("create_profiles", create_profiles),
("activate_parquet", activate_parquet),
("activate_lance", activate_lance),
])
# ─── Phase 4: Stress queries across backends ───
def phase_stress(runner: PlaybookRunner, errors: ErrorPipeline):
queries = [
"Which models performed best on complex prompts?",
"Find pipeline runs that failed or had errors",
"What types of prompts get the highest quality scores?",
"Show me threat intelligence related to authentication attacks",
"Which adaptive pipeline runs had the most model escalations?",
]
def stress_sql():
sqls = [
("KB dataset count", "SELECT COUNT(*) FROM kb_team_runs"),
("Score distribution", "SELECT mode, AVG(quality_score) avg_score, COUNT(*) cnt FROM kb_team_runs GROUP BY mode ORDER BY avg_score DESC LIMIT 10"),
("Pipeline status", "SELECT status, COUNT(*) FROM kb_pipeline_runs GROUP BY status"),
("Cache hit rates", "SELECT SUM(hit_count) total_hits, COUNT(*) entries FROM kb_response_cache"),
]
for name, sql in sqls:
t0 = time.time()
r = post("/query/sql", {"sql": sql})
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("stress", name, r["error"])
runner.record(name, False, r["error"][:60], ms)
else:
runner.record(name, True, f"{r.get('row_count',0)} rows", ms)
def stress_vector_parquet():
"""Search through the Parquet profile."""
post("/vectors/profile/agent-parquet/activate", timeout=300)
for i, q in enumerate(queries[:3]):
t0 = time.time()
r = post("/vectors/profile/agent-parquet/search", {
"index_name": "kb_team_runs_agent",
"query": q,
"top_k": 3,
})
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("stress", f"parquet_search_{i}", r["error"])
runner.record(f"parquet search: {q[:40]}", False, r["error"][:60], ms)
else:
method = r.get("method", "?")
hits = len(r.get("results", []))
runner.record(f"parquet search: {q[:40]}", True, f"{method} {hits} hits", ms)
def stress_vector_lance():
"""Hot-swap to Lance, run same queries."""
post("/vectors/profile/agent-lance/activate", timeout=300)
vram = get("/ai/vram")
loaded = [m["name"] for m in vram.get("ollama_loaded", [])] if isinstance(vram, dict) else []
runner.record("VRAM after swap", True, f"models={loaded}")
for i, q in enumerate(queries[:3]):
t0 = time.time()
r = post("/vectors/profile/agent-lance/search", {
"index_name": "kb_team_runs_agent",
"query": q,
"top_k": 3,
})
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("stress", f"lance_search_{i}", r["error"])
runner.record(f"lance search: {q[:40]}", False, r["error"][:60], ms)
else:
method = r.get("method", "?")
hits = len(r.get("results", []))
runner.record(f"lance search: {q[:40]}", True, f"{method} {hits} hits", ms)
def stress_rag():
"""Full RAG pipeline — question → embed → search → generate."""
for q in queries[:2]:
t0 = time.time()
r = post("/vectors/rag", {
"index_name": "kb_team_runs_agent",
"question": q,
"top_k": 3,
}, timeout=120)
ms = (time.time() - t0) * 1000
if "error" in r:
errors.capture("stress", f"rag", r["error"])
runner.record(f"RAG: {q[:40]}", False, r["error"][:60], ms)
else:
answer = r.get("answer", "")
sources = len(r.get("sources", []))
runner.record(f"RAG: {q[:40]}", len(answer) > 10,
f"{len(answer)} chars, {sources} sources", ms)
runner.run_playbook("Phase 4: Stress Test", [
("sql_queries", stress_sql),
("vector_parquet", stress_vector_parquet),
("vector_lance", stress_vector_lance),
("rag_pipeline", stress_rag),
])
# ─── Phase 5: Recursive iteration ───
def phase_iterate(runner: PlaybookRunner, errors: ErrorPipeline):
"""Read the error pipeline, generate new test scenarios targeting
the weak spots, run them. This is the recursive playbook."""
def analyze_errors():
if not errors.errors:
runner.record("error analysis", True, "no errors to iterate on — clean run!")
return
# Group errors by phase
by_phase = {}
for e in errors.errors:
by_phase.setdefault(e["phase"], []).append(e)
for phase, errs in by_phase.items():
runner.record(f"errors in {phase}", False, f"{len(errs)} failures")
# Ask the LLM to analyze the error patterns
error_summary = "\n".join(
f"- [{e['phase']}] {e['operation']}: {e['error'][:100]}"
for e in errors.errors[:10]
)
t0 = time.time()
r = post("/ai/generate", {
"prompt": f"""Analyze these Lakehouse system errors and suggest what to fix:
{error_summary}
Be specific: name the endpoint, the likely root cause, and a concrete fix. Keep it under 200 words.""",
"model": "qwen2.5",
"max_tokens": 500,
})
ms = (time.time() - t0) * 1000
if "error" not in r:
analysis = r.get("text", "")
runner.record("LLM error analysis", True, f"{len(analysis)} chars", ms)
print(f"\n ┌─ LLM Analysis ─────────────────────────")
for line in analysis.strip().split("\n")[:10]:
print(f"{line}")
print(f" └────────────────────────────────────────\n")
def retry_failures():
"""Re-run operations that failed, to see if they were transient."""
if not errors.errors:
return
retried = 0
fixed = 0
for e in errors.errors[:5]:
if e["phase"] == "stress" and "search" in e["operation"]:
retried += 1
r = post("/vectors/hnsw/search", {
"index_name": "kb_team_runs_agent",
"query": "test retry",
"top_k": 2,
})
if "error" not in r:
fixed += 1
if retried:
runner.record(f"retry failures", fixed > 0,
f"{fixed}/{retried} fixed on retry")
runner.run_playbook("Phase 5: Recursive Iteration", [
("analyze_errors", analyze_errors),
("retry_failures", retry_failures),
])
# ─── Main ───
def main():
print("=" * 60)
print("AUTONOMOUS LAKEHOUSE AGENT")
print(f"Started: {datetime.utcnow().isoformat()}")
print(f"Target: {BASE}")
print(f"Postgres: {PG_DSN}")
print("=" * 60)
errors = ErrorPipeline()
runner = PlaybookRunner(errors)
# Iteration loop
for iteration in range(2): # 2 iterations for now
runner.iteration = iteration
print(f"\n{''*60}")
print(f" ITERATION {iteration}")
print(f"{''*60}")
if iteration == 0:
phase_ingest(runner, errors)
phase_embed(runner, errors)
phase_profiles(runner, errors)
phase_stress(runner, errors)
phase_iterate(runner, errors)
# Flush errors to the lakehouse
errors.flush_to_lakehouse()
# Final scorecard
print(f"\n{''*60}")
print(f" FINAL SCORECARD")
print(f"{''*60}")
passed = sum(1 for r in runner.results if r["passed"])
total = len(runner.results)
pct = passed / total * 100 if total else 0
print(f"\n {passed}/{total} passed ({pct:.0f}%)")
print(f" Error pipeline: {errors.summary()}")
print(f"\n {'Test':<50} {'ms':>8} {'Iter':>4} {'Status':>6}")
print(f" {'-'*72}")
for r in runner.results:
ms = f"{r['ms']:.0f}" if r['ms'] else ""
status = "PASS" if r["passed"] else "FAIL"
print(f" {r['name']:<50} {ms:>8} {r['iteration']:>4} {status:>6}")
failed = [r for r in runner.results if not r["passed"]]
if failed:
print(f"\n FAILURES ({len(failed)}):")
for r in failed:
print(f" [{r['iteration']}] {r['name']}: {r['detail'][:80]}")
return 0 if pct >= 80 else 1
if __name__ == "__main__":
sys.exit(main())