profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

215 lines
8.0 KiB
TypeScript

#!/usr/bin/env bun
// Autonomous scrum loop — wraps scrum_master_pipeline.ts + scrum_applier.ts
// in a goal-driven retry loop. Observer is POSTed an iteration summary at
// every boundary so it can build meta-commentary outside the loop's epistemic
// scope.
//
// Usage:
// LOOP_TARGETS="crates/a/src/x.rs,crates/b/src/y.rs" \
// LOOP_MAX_ITERS=5 \
// LOOP_PUSH=1 \
// bun run tests/real-world/autonomous_loop.ts
//
// Stop conditions: max_iters reached OR 2 consecutive iters with 0 commits.
import { spawn } from "node:child_process";
import { appendFile, readFile } from "node:fs/promises";
import { existsSync } from "node:fs";
const REPO = "/home/profit/lakehouse";
const OBSERVER = process.env.LOOP_OBSERVER ?? "http://localhost:3800";
const BRANCH = process.env.LOOP_BRANCH ?? "scrum/auto-apply-19814";
const MAX_ITERS = Number(process.env.LOOP_MAX_ITERS ?? 3);
const PUSH = process.env.LOOP_PUSH === "1";
const MIN_CONF = process.env.LOOP_MIN_CONF ?? "85";
// Optional override — when unset, let scrum_applier.ts use ITS default
// (currently x-ai/grok-4.1-fast on openrouter). The prior hardcoded
// qwen3-coder:480b default was clobbering the applier's own default
// and forcing every iter to hit the throttled ollama_cloud account.
const APPLIER_MODEL = process.env.LOOP_APPLIER_MODEL;
const APPLIER_PROVIDER = process.env.LOOP_APPLIER_PROVIDER;
const TARGETS = (process.env.LOOP_TARGETS ?? "crates/queryd/src/service.rs,crates/gateway/src/main.rs,crates/gateway/src/v1/mod.rs")
.split(",").map(s => s.trim()).filter(Boolean);
const FORENSIC = process.env.LH_SCRUM_FORENSIC ?? `${REPO}/docs/SCRUM_FORENSIC_PROMPT.md`;
const PROPOSAL = process.env.LH_SCRUM_PROPOSAL ?? `${REPO}/docs/SCRUM_FIX_WAVE.md`;
const LOOP_ID = `loop_${Date.now().toString(36)}`;
const JOURNAL = `${REPO}/data/_kb/autonomous_loops.jsonl`;
interface IterResult {
iter: number;
scrum_reviews_added: number;
applier_outcomes: Record<string, number>;
commits_landed: number;
commit_shas: string[];
build_status: "green" | "red" | "unknown";
duration_ms: number;
}
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[loop ${LOOP_ID} ${ts}] ${msg}`);
}
function runCmd(cmd: string, args: string[], env: Record<string, string> = {}): Promise<{ code: number; stdout: string; stderr: string }> {
return new Promise((resolve) => {
const child = spawn(cmd, args, { cwd: REPO, env: { ...process.env, ...env } });
let stdout = "", stderr = "";
child.stdout.on("data", (d) => { stdout += d; process.stdout.write(d); });
child.stderr.on("data", (d) => { stderr += d; process.stderr.write(d); });
child.on("close", (code) => resolve({ code: code ?? -1, stdout, stderr }));
});
}
async function countLines(path: string): Promise<number> {
if (!existsSync(path)) return 0;
const text = await readFile(path, "utf8");
return text.split("\n").filter(Boolean).length;
}
async function gitHeadSha(): Promise<string> {
const r = await runCmd("git", ["rev-parse", "HEAD"]);
return r.stdout.trim();
}
async function commitsSince(baseSha: string): Promise<string[]> {
const r = await runCmd("git", ["log", "--oneline", `${baseSha}..HEAD`]);
return r.stdout.trim().split("\n").filter(Boolean);
}
async function cargoCheckGreen(): Promise<boolean> {
log("cargo check --workspace …");
const r = await runCmd("cargo", ["check", "--workspace", "--quiet"]);
return r.code === 0;
}
async function postObserver(payload: object) {
try {
const r = await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(5000),
});
if (!r.ok) log(`observer POST returned ${r.status}`);
} catch (e: any) {
log(`observer POST failed: ${e.message}`);
}
}
async function runIter(iter: number, baseSha: string): Promise<IterResult> {
const t0 = Date.now();
log(`══ iter ${iter} start (base ${baseSha.slice(0, 8)}) targets=${TARGETS.length}`);
const reviewsBefore = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyBefore = await countLines(`${REPO}/data/_kb/auto_apply.jsonl`);
log(`scrum_master_pipeline.ts → ${TARGETS.length} files`);
await runCmd("bun", ["run", "tests/real-world/scrum_master_pipeline.ts"], {
LH_SCRUM_FILES: TARGETS.join(","),
LH_SCRUM_FORENSIC: FORENSIC,
LH_SCRUM_PROPOSAL: PROPOSAL,
});
log(`scrum_applier.ts COMMIT=1 MIN_CONF=${MIN_CONF} files=${TARGETS.length}`);
// Only forward model/provider when explicitly overridden — otherwise
// let scrum_applier.ts use its own defaults (Grok 4.1 fast on openrouter).
const applierEnv: Record<string, string> = {
LH_APPLIER_COMMIT: "1",
LH_APPLIER_MIN_CONF: MIN_CONF,
LH_APPLIER_MAX_FILES: String(TARGETS.length),
LH_APPLIER_BRANCH: BRANCH,
// Constrain applier to THIS iter's targets so it patches what we
// just reviewed instead of the highest-confidence file from history.
LH_APPLIER_FILES: TARGETS.join(","),
};
if (APPLIER_MODEL) applierEnv.LH_APPLIER_MODEL = APPLIER_MODEL;
if (APPLIER_PROVIDER) applierEnv.LH_APPLIER_PROVIDER = APPLIER_PROVIDER;
await runCmd("bun", ["run", "tests/real-world/scrum_applier.ts"], applierEnv);
const reviewsAfter = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyAfterText = existsSync(`${REPO}/data/_kb/auto_apply.jsonl`)
? await readFile(`${REPO}/data/_kb/auto_apply.jsonl`, "utf8")
: "";
const applyRows = applyAfterText.split("\n").filter(Boolean).slice(applyBefore);
const outcomes: Record<string, number> = {};
for (const line of applyRows) {
try {
const o = JSON.parse(line);
outcomes[o.action ?? "?"] = (outcomes[o.action ?? "?"] ?? 0) + 1;
} catch { /* skip malformed */ }
}
const commitShas = await commitsSince(baseSha);
const buildStatus = commitShas.length > 0 ? (await cargoCheckGreen() ? "green" : "red") : "unknown";
const result: IterResult = {
iter,
scrum_reviews_added: reviewsAfter - reviewsBefore,
applier_outcomes: outcomes,
commits_landed: commitShas.length,
commit_shas: commitShas.map(s => s.split(" ")[0]),
build_status: buildStatus,
duration_ms: Date.now() - t0,
};
log(`iter ${iter} done — reviews+${result.scrum_reviews_added} commits=${result.commits_landed} build=${buildStatus} (${(result.duration_ms / 1000).toFixed(1)}s)`);
await postObserver({
source: "autonomous_loop",
loop_id: LOOP_ID,
event_kind: "iteration_complete",
iter,
targets: TARGETS,
success: buildStatus !== "red",
scrum_reviews_added: result.scrum_reviews_added,
applier_outcomes: result.applier_outcomes,
commits_landed: result.commits_landed,
commit_shas: result.commit_shas,
build_status: buildStatus,
duration_ms: result.duration_ms,
ts: new Date().toISOString(),
});
await appendFile(JOURNAL, JSON.stringify({ loop_id: LOOP_ID, ...result, ts: new Date().toISOString() }) + "\n");
return result;
}
async function main() {
log(`autonomous loop starting · branch=${BRANCH} max_iters=${MAX_ITERS} push=${PUSH}`);
log(`targets: ${TARGETS.join(", ")}`);
const branchR = await runCmd("git", ["branch", "--show-current"]);
if (branchR.stdout.trim() !== BRANCH) {
log(`ERROR: on branch ${branchR.stdout.trim()}, expected ${BRANCH}. Refusing to run.`);
process.exit(1);
}
let consecutiveZero = 0;
for (let iter = 1; iter <= MAX_ITERS; iter++) {
const baseSha = await gitHeadSha();
const result = await runIter(iter, baseSha);
if (PUSH && result.commits_landed > 0) {
log(`git push origin ${BRANCH}`);
const pushR = await runCmd("git", ["push", "origin", BRANCH]);
if (pushR.code !== 0) log(`push failed (continuing): ${pushR.stderr.slice(0, 200)}`);
}
consecutiveZero = result.commits_landed === 0 ? consecutiveZero + 1 : 0;
if (consecutiveZero >= 2) {
log(`STOP: 2 consecutive iters with 0 commits. Loop converged or stuck.`);
break;
}
}
log(`loop ${LOOP_ID} complete. Journal: ${JOURNAL}`);
}
main().catch((e) => {
log(`FATAL: ${e.message}`);
process.exit(1);
});