#!/usr/bin/env bun // Agent harness — runs local qwen3.5:latest as an autonomous agent // against PRD.md. Exposes a tool-call loop. Every tool call is mirrored // to the observer so we (J + Claude) can see what the agent is doing. // // Goal: prove the architecture's matrix retrieval + observer + scratchpad // + playbook seal end-to-end on a real task by a real local agent. // // Iter 1: just run it. Watch where it gets stuck. // Iter N: tune helpers based on what we observed. import { appendFile, readFile } from "node:fs/promises"; import { existsSync, mkdirSync } from "node:fs"; const GATEWAY = "http://localhost:3100"; const SIDECAR = "http://localhost:3200"; const OBSERVER = "http://localhost:3800"; const PRD_PATH = "/home/profit/lakehouse/tests/agent_test/PRD.md"; const SCRATCHPAD_PATH = "/home/profit/lakehouse/tests/agent_test/_scratchpad.txt"; const TRACE_PATH = "/home/profit/lakehouse/tests/agent_test/_trace.jsonl"; const FINAL_PATH = "/home/profit/lakehouse/tests/agent_test/_final.md"; const PERMITS_RAW = "/tmp/vectorize_raw/chicago_permits_2026-04-25.json"; const AGENT_MODEL = process.env.AGENT_MODEL ?? "qwen3.5:latest"; const MAX_STEPS = Number(process.env.AGENT_MAX_STEPS ?? 15); const SESSION_ID = `agent_${Date.now().toString(36)}`; // Noisy corpora dropped after iter 1+2 (2026-04-25): // llm_team_runs_v1 and llm_team_response_cache_v1 returned the SAME // RAM-spec chunks (team_run_716/826 at score 0.59) regardless of query. // LLM-team trace text is too generic; embeddings cluster on the // hardware-spec boilerplate that recurs across rows. Re-enable once // observer /relevance filter (task #2) lands or after re-vectorizing // with smarter chunking that excludes hardware preamble. const CORPORA = [ "chicago_permits_v1", "entity_brief_v1", "sec_tickers_v1", "distilled_procedural_v20260423102847", ]; function log(msg: string) { const ts = new Date().toISOString().slice(11, 19); console.log(`[harness ${ts}] ${msg}`); } async function emitObserverEvent(payload: object) { try { await fetch(`${OBSERVER}/event`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ source: "agent_test", session_id: SESSION_ID, ...payload, ts: new Date().toISOString() }), signal: AbortSignal.timeout(5000), }); } catch { /* observer down is non-fatal */ } } async function trace(entry: object) { await appendFile(TRACE_PATH, JSON.stringify({ ts: new Date().toISOString(), session_id: SESSION_ID, ...entry }) + "\n"); } // ─── TOOLS — what the agent can call ─── let permitsCache: any[] | null = null; async function loadPermits(): Promise { if (permitsCache) return permitsCache; if (!existsSync(PERMITS_RAW)) { // Fetch from raw bucket via mc const proc = Bun.spawn(["mc", "cp", "-q", "local/raw/chicago/permits_2026-04-25.json", PERMITS_RAW]); await proc.exited; } permitsCache = JSON.parse(await readFile(PERMITS_RAW, "utf8")); return permitsCache!; } async function tool_list_permits(args: { min_cost?: number; permit_type?: string }): Promise { const all = await loadPermits(); let filtered = all.filter(p => p.contact_1_name || p.contact_2_name); if (args.min_cost) filtered = filtered.filter(p => Number(p.reported_cost ?? 0) >= args.min_cost!); if (args.permit_type) filtered = filtered.filter(p => (p.permit_type ?? "").toLowerCase().includes(args.permit_type!.toLowerCase())); filtered.sort((a, b) => Number(b.reported_cost ?? 0) - Number(a.reported_cost ?? 0)); const out = filtered.slice(0, 5).map(p => `- permit_id=${p.permit_} type=${p.permit_type} cost=$${Number(p.reported_cost ?? 0).toLocaleString()} contractor=${p.contact_1_name ?? "?"}` ).join("\n"); return `Top ${Math.min(5, filtered.length)} of ${filtered.length} matching permits:\n${out}`; } async function tool_read_permit(args: { permit_id: string }): Promise { const all = await loadPermits(); const p = all.find(x => x.permit_ === args.permit_id); if (!p) return `permit ${args.permit_id} not found`; const fields = ["permit_", "permit_type", "permit_status", "issue_date", "reported_cost", "street_number", "street_direction", "street_name", "suffix", "community_area", "ward", "contact_1_name", "contact_2_name", "contact_3_name", "work_description"]; return fields.map(f => `${f}: ${p[f] ?? ""}`).join("\n"); } async function tool_query_matrix(args: { query: string; top_k?: number }): Promise { const k = args.top_k ?? 3; const all: Array<{ corpus: string; score: number; doc_id: string; text: string }> = []; const perCorpus: Record = {}; await Promise.all(CORPORA.map(async (corpus) => { try { const r = await fetch(`${GATEWAY}/vectors/search`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ index_name: corpus, query: args.query, top_k: k }), signal: AbortSignal.timeout(10000), }); if (!r.ok) { perCorpus[corpus] = -1; return; } const data: any = await r.json(); const results = data.results ?? []; perCorpus[corpus] = results.length; for (const h of results) { all.push({ corpus, score: Number(h.score ?? 0), doc_id: String(h.doc_id ?? "?"), text: String(h.chunk_text ?? "").slice(0, 300) }); } } catch { perCorpus[corpus] = -1; } })); all.sort((a, b) => b.score - a.score); const top = all.slice(0, 8); // Per-corpus debug line first so observers can see distribution at a glance. const dist = Object.entries(perCorpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" "); if (top.length === 0) return `no matrix evidence for: ${args.query}\n(per-corpus: ${dist})`; return `(per-corpus: ${dist})\n` + top.map((h, i) => `[${i + 1}] ${h.corpus} score=${h.score.toFixed(2)} doc=${h.doc_id}\n ${h.text.replace(/\s+/g, " ").trim()}`).join("\n"); } async function tool_note(args: { text: string }): Promise { const stamp = new Date().toISOString().slice(11, 19); await appendFile(SCRATCHPAD_PATH, `[${stamp}] ${args.text}\n`); return `noted (${args.text.length} chars)`; } async function tool_read_scratchpad(): Promise { if (!existsSync(SCRATCHPAD_PATH)) return "(empty)"; return await readFile(SCRATCHPAD_PATH, "utf8"); } async function tool_done(args: { summary: string }): Promise { const fs = await import("node:fs/promises"); await fs.writeFile(FINAL_PATH, args.summary); return `done; final saved to ${FINAL_PATH} (${args.summary.length} chars)`; } const TOOLS: Record Promise> = { list_permits: tool_list_permits, read_permit: tool_read_permit, query_matrix: tool_query_matrix, note: tool_note, read_scratchpad: tool_read_scratchpad, done: tool_done, }; const TOOL_SCHEMA = `Available tools (call by emitting JSON like: {"tool": "name", "args": {...}}): - list_permits(min_cost?: number, permit_type?: string) — top 5 by cost - read_permit(permit_id: string) — full permit fields - query_matrix(query: string, top_k?: number) — search KB - note(text: string) — append to scratchpad - read_scratchpad() — read your scratchpad - done(summary: string) — finish; pass final markdown analysis`; // ─── AGENT LOOP ─── async function callAgent(messages: Array<{role: string; content: string}>): Promise { // think:false disables hidden reasoning so all generated tokens go to // visible response. qwen3.5:latest defaults to thinking and silently // burns the token budget otherwise. const r = await fetch(`${SIDECAR}/generate`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ model: AGENT_MODEL, prompt: messages.map(m => `${m.role.toUpperCase()}:\n${m.content}`).join("\n\n") + "\n\nASSISTANT:\n", stream: false, max_tokens: 1500, think: false, }), signal: AbortSignal.timeout(180000), }); if (!r.ok) throw new Error(`agent ${r.status}: ${(await r.text()).slice(0, 200)}`); const j: any = await r.json(); return String(j.text ?? j.response ?? "").trim(); } function extractToolCall(response: string): { tool: string; args: any } | null { // Look for JSON block in the response const fenced = response.match(/```(?:json)?\s*(\{[\s\S]+?\})\s*```/); const candidate = fenced ? fenced[1] : (response.match(/\{[\s\S]*\}/)?.[0] ?? null); if (!candidate) return null; try { const parsed = JSON.parse(candidate); if (parsed.tool && typeof parsed.tool === "string") return { tool: parsed.tool, args: parsed.args ?? {} }; } catch { /* not JSON */ } return null; } async function main() { log(`session=${SESSION_ID} model=${AGENT_MODEL} max_steps=${MAX_STEPS}`); // Reset workspace files for this session for (const p of [SCRATCHPAD_PATH, TRACE_PATH, FINAL_PATH]) { try { await Bun.write(p, ""); } catch { /* ignore */ } } const prd = await readFile(PRD_PATH, "utf8"); log(`loaded PRD (${prd.length} chars)`); await emitObserverEvent({ event_kind: "agent_start", model: AGENT_MODEL }); // Pre-flight: pull prior accepted pathway traces for this task class // and surface them as a "PROVEN APPROACHES" preamble. This closes the // matrix loop — successful past runs now actively help the next agent. let priorPlaybooks = ""; try { const stateFile = Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json"); if (await stateFile.exists()) { const state: any = JSON.parse(await stateFile.text()); const matched: any[] = []; for (const traces of Object.values(state.pathways ?? {}) as any[][]) { for (const t of traces) { if (t.task_class === "chicago_permit_analysis" && t.final_verdict === "accepted" && !t.retired) { matched.push(t); } } } matched.sort((a, b) => (b.created_at ?? "").localeCompare(a.created_at ?? "")); if (matched.length > 0) { const top = matched.slice(0, 2); priorPlaybooks = "\n\n═══ 📖 PROVEN APPROACHES FROM PRIOR ACCEPTED RUNS ═══\n" + top.map((t, i) => `[${i + 1}] pathway=${t.pathway_id?.slice(0, 12)} previously succeeded on ${t.file_path}\n` + `Approach excerpt:\n${(t.reducer_summary ?? "").slice(0, 800)}` ).join("\n\n") + "\n═══ end proven approaches ═══\n\nUse these as REFERENCE for what worked. Don't copy verbatim, but follow the same workflow shape (plan → list → read → matrix → analyze → done).\n"; log(`📖 found ${matched.length} prior accepted pathway(s) for chicago_permit_analysis — top ${top.length} prepended to agent context`); } else { log(`📖 no prior accepted pathways for chicago_permit_analysis (this is the first run)`); } } } catch (e: any) { log(`📖 pathway preamble skipped: ${e.message}`); } const systemMsg = `You are an autonomous agent. Read the PRD below and follow its instructions exactly. ${TOOL_SCHEMA} To call a tool, respond with ONLY a JSON object: {"tool": "", "args": {...}} No markdown, no explanation around it. The harness will execute the tool and give you the result, then ask you what to do next. When you are completely finished, call done(summary="").`; const messages: Array<{role: string; content: string}> = [ { role: "system", content: systemMsg }, { role: "user", content: `PRD:\n\n${prd}${priorPlaybooks}\n\nNow respond. Remember: PLAN first via note() before executing.` }, ]; // Iter 3 surfaced: when the matrix returns real evidence, the agent // gets analysis paralysis — keeps calling note() to refine instead of // producing the final output. Guard: after MAX_CONSECUTIVE_NOTES // note() calls in a row, harness injects a hard-stop user message // telling the agent it MUST call done() next. const MAX_CONSECUTIVE_NOTES = Number(process.env.AGENT_MAX_CONSECUTIVE_NOTES ?? 2); let consecutiveNotes = 0; let isDone = false; for (let step = 1; step <= MAX_STEPS && !isDone; step++) { log(`step ${step}/${MAX_STEPS} — calling agent...`); const t0 = Date.now(); let response: string; try { response = await callAgent(messages); } catch (e: any) { log(` ✗ agent error: ${e.message}`); await trace({ step, kind: "error", error: e.message }); await emitObserverEvent({ event_kind: "agent_error", step, error: e.message }); break; } const ms = Date.now() - t0; log(` · agent responded ${response.length} chars in ${ms}ms`); await trace({ step, kind: "agent_response", chars: response.length, latency_ms: ms, response: response.slice(0, 4000) }); const call = extractToolCall(response); if (!call) { log(` ⚠ no tool call extracted from response — agent may be confused`); await trace({ step, kind: "no_tool_call", preview: response.slice(0, 500) }); await emitObserverEvent({ event_kind: "agent_no_tool", step, preview: response.slice(0, 200) }); // Push the agent: tell it to call a tool messages.push({ role: "assistant", content: response }); messages.push({ role: "user", content: `Your last response did not contain a valid tool call. Respond with ONLY a JSON object like {"tool": "note", "args": {"text": "..."}}. No prose around it.` }); continue; } log(` → tool: ${call.tool}(${JSON.stringify(call.args).slice(0, 200)})`); if (!TOOLS[call.tool]) { const err = `unknown tool: ${call.tool}`; log(` ✗ ${err}`); await trace({ step, kind: "tool_unknown", tool: call.tool }); await emitObserverEvent({ event_kind: "tool_unknown", step, tool: call.tool }); messages.push({ role: "assistant", content: response }); messages.push({ role: "user", content: `Tool "${call.tool}" does not exist. Available: ${Object.keys(TOOLS).join(", ")}. Try again.` }); continue; } const resStart = Date.now(); let result: string; try { result = await TOOLS[call.tool](call.args); } catch (e: any) { result = `TOOL ERROR: ${e.message}`; } const resMs = Date.now() - resStart; log(` ← ${result.slice(0, 200)}${result.length > 200 ? "..." : ""} (${resMs}ms)`); await trace({ step, kind: "tool_call", tool: call.tool, args: call.args, result: result.slice(0, 4000), latency_ms: resMs }); await emitObserverEvent({ event_kind: "tool_call", step, tool: call.tool, result_chars: result.length }); if (call.tool === "done") { isDone = true; log(` ✓ DONE`); await emitObserverEvent({ event_kind: "agent_done", step }); break; } // Track consecutive note() calls; force done() if too many in a row. if (call.tool === "note") consecutiveNotes++; else consecutiveNotes = 0; messages.push({ role: "assistant", content: response }); if (consecutiveNotes >= MAX_CONSECUTIVE_NOTES) { log(` ⚠ ${consecutiveNotes} consecutive note() calls — forcing done() next`); await emitObserverEvent({ event_kind: "force_done_pressure", step, consecutive_notes: consecutiveNotes }); messages.push({ role: "user", content: `Tool result:\n${result}\n\nYou have called note() ${consecutiveNotes} times in a row without producing output. STOP NOTING. Call done(summary="") NOW with whatever analysis you have. Do not call note() again. Respond with ONLY: {"tool": "done", "args": {"summary": "..."}}` }); consecutiveNotes = 0; // reset so we only push once per streak } else { messages.push({ role: "user", content: `Tool result:\n${result}\n\nWhat next?` }); } } if (!isDone) { log(`✗ agent did not complete within ${MAX_STEPS} steps`); await emitObserverEvent({ event_kind: "agent_max_steps", final_step: MAX_STEPS }); // Mem0: any partial trace this session inserted should be retired // so future agents don't get a broken playbook in their preamble. // We don't have a trace_uid for this session yet (insert happens // on done); but if any prior trace has the same workflow shape as // this session's tool sequence, retire it. // For now, just log — actual retirement would happen if seal had run. log(` ⚠ no playbook seal will be performed for failed run`); } log(`session ${SESSION_ID} ended. Trace: ${TRACE_PATH}`); if (existsSync(FINAL_PATH)) log(`Final output: ${FINAL_PATH}`); } mkdirSync("/home/profit/lakehouse/tests/agent_test", { recursive: true }); main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });