Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
215 lines
8.0 KiB
TypeScript
215 lines
8.0 KiB
TypeScript
#!/usr/bin/env bun
|
|
// Autonomous scrum loop — wraps scrum_master_pipeline.ts + scrum_applier.ts
|
|
// in a goal-driven retry loop. Observer is POSTed an iteration summary at
|
|
// every boundary so it can build meta-commentary outside the loop's epistemic
|
|
// scope.
|
|
//
|
|
// Usage:
|
|
// LOOP_TARGETS="crates/a/src/x.rs,crates/b/src/y.rs" \
|
|
// LOOP_MAX_ITERS=5 \
|
|
// LOOP_PUSH=1 \
|
|
// bun run tests/real-world/autonomous_loop.ts
|
|
//
|
|
// Stop conditions: max_iters reached OR 2 consecutive iters with 0 commits.
|
|
|
|
import { spawn } from "node:child_process";
|
|
import { appendFile, readFile } from "node:fs/promises";
|
|
import { existsSync } from "node:fs";
|
|
|
|
const REPO = "/home/profit/lakehouse";
|
|
const OBSERVER = process.env.LOOP_OBSERVER ?? "http://localhost:3800";
|
|
const BRANCH = process.env.LOOP_BRANCH ?? "scrum/auto-apply-19814";
|
|
const MAX_ITERS = Number(process.env.LOOP_MAX_ITERS ?? 3);
|
|
const PUSH = process.env.LOOP_PUSH === "1";
|
|
const MIN_CONF = process.env.LOOP_MIN_CONF ?? "85";
|
|
// Optional override — when unset, let scrum_applier.ts use ITS default
|
|
// (currently x-ai/grok-4.1-fast on openrouter). The prior hardcoded
|
|
// qwen3-coder:480b default was clobbering the applier's own default
|
|
// and forcing every iter to hit the throttled ollama_cloud account.
|
|
const APPLIER_MODEL = process.env.LOOP_APPLIER_MODEL;
|
|
const APPLIER_PROVIDER = process.env.LOOP_APPLIER_PROVIDER;
|
|
const TARGETS = (process.env.LOOP_TARGETS ?? "crates/queryd/src/service.rs,crates/gateway/src/main.rs,crates/gateway/src/v1/mod.rs")
|
|
.split(",").map(s => s.trim()).filter(Boolean);
|
|
|
|
const FORENSIC = process.env.LH_SCRUM_FORENSIC ?? `${REPO}/docs/SCRUM_FORENSIC_PROMPT.md`;
|
|
const PROPOSAL = process.env.LH_SCRUM_PROPOSAL ?? `${REPO}/docs/SCRUM_FIX_WAVE.md`;
|
|
|
|
const LOOP_ID = `loop_${Date.now().toString(36)}`;
|
|
const JOURNAL = `${REPO}/data/_kb/autonomous_loops.jsonl`;
|
|
|
|
interface IterResult {
|
|
iter: number;
|
|
scrum_reviews_added: number;
|
|
applier_outcomes: Record<string, number>;
|
|
commits_landed: number;
|
|
commit_shas: string[];
|
|
build_status: "green" | "red" | "unknown";
|
|
duration_ms: number;
|
|
}
|
|
|
|
function log(msg: string) {
|
|
const ts = new Date().toISOString().slice(11, 19);
|
|
console.log(`[loop ${LOOP_ID} ${ts}] ${msg}`);
|
|
}
|
|
|
|
function runCmd(cmd: string, args: string[], env: Record<string, string> = {}): Promise<{ code: number; stdout: string; stderr: string }> {
|
|
return new Promise((resolve) => {
|
|
const child = spawn(cmd, args, { cwd: REPO, env: { ...process.env, ...env } });
|
|
let stdout = "", stderr = "";
|
|
child.stdout.on("data", (d) => { stdout += d; process.stdout.write(d); });
|
|
child.stderr.on("data", (d) => { stderr += d; process.stderr.write(d); });
|
|
child.on("close", (code) => resolve({ code: code ?? -1, stdout, stderr }));
|
|
});
|
|
}
|
|
|
|
async function countLines(path: string): Promise<number> {
|
|
if (!existsSync(path)) return 0;
|
|
const text = await readFile(path, "utf8");
|
|
return text.split("\n").filter(Boolean).length;
|
|
}
|
|
|
|
async function gitHeadSha(): Promise<string> {
|
|
const r = await runCmd("git", ["rev-parse", "HEAD"]);
|
|
return r.stdout.trim();
|
|
}
|
|
|
|
async function commitsSince(baseSha: string): Promise<string[]> {
|
|
const r = await runCmd("git", ["log", "--oneline", `${baseSha}..HEAD`]);
|
|
return r.stdout.trim().split("\n").filter(Boolean);
|
|
}
|
|
|
|
async function cargoCheckGreen(): Promise<boolean> {
|
|
log("cargo check --workspace …");
|
|
const r = await runCmd("cargo", ["check", "--workspace", "--quiet"]);
|
|
return r.code === 0;
|
|
}
|
|
|
|
async function postObserver(payload: object) {
|
|
try {
|
|
const r = await fetch(`${OBSERVER}/event`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify(payload),
|
|
signal: AbortSignal.timeout(5000),
|
|
});
|
|
if (!r.ok) log(`observer POST returned ${r.status}`);
|
|
} catch (e: any) {
|
|
log(`observer POST failed: ${e.message}`);
|
|
}
|
|
}
|
|
|
|
async function runIter(iter: number, baseSha: string): Promise<IterResult> {
|
|
const t0 = Date.now();
|
|
log(`══ iter ${iter} start (base ${baseSha.slice(0, 8)}) targets=${TARGETS.length}`);
|
|
|
|
const reviewsBefore = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
|
|
const applyBefore = await countLines(`${REPO}/data/_kb/auto_apply.jsonl`);
|
|
|
|
log(`scrum_master_pipeline.ts → ${TARGETS.length} files`);
|
|
await runCmd("bun", ["run", "tests/real-world/scrum_master_pipeline.ts"], {
|
|
LH_SCRUM_FILES: TARGETS.join(","),
|
|
LH_SCRUM_FORENSIC: FORENSIC,
|
|
LH_SCRUM_PROPOSAL: PROPOSAL,
|
|
});
|
|
|
|
log(`scrum_applier.ts COMMIT=1 MIN_CONF=${MIN_CONF} files=${TARGETS.length}`);
|
|
// Only forward model/provider when explicitly overridden — otherwise
|
|
// let scrum_applier.ts use its own defaults (Grok 4.1 fast on openrouter).
|
|
const applierEnv: Record<string, string> = {
|
|
LH_APPLIER_COMMIT: "1",
|
|
LH_APPLIER_MIN_CONF: MIN_CONF,
|
|
LH_APPLIER_MAX_FILES: String(TARGETS.length),
|
|
LH_APPLIER_BRANCH: BRANCH,
|
|
// Constrain applier to THIS iter's targets so it patches what we
|
|
// just reviewed instead of the highest-confidence file from history.
|
|
LH_APPLIER_FILES: TARGETS.join(","),
|
|
};
|
|
if (APPLIER_MODEL) applierEnv.LH_APPLIER_MODEL = APPLIER_MODEL;
|
|
if (APPLIER_PROVIDER) applierEnv.LH_APPLIER_PROVIDER = APPLIER_PROVIDER;
|
|
await runCmd("bun", ["run", "tests/real-world/scrum_applier.ts"], applierEnv);
|
|
|
|
const reviewsAfter = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
|
|
const applyAfterText = existsSync(`${REPO}/data/_kb/auto_apply.jsonl`)
|
|
? await readFile(`${REPO}/data/_kb/auto_apply.jsonl`, "utf8")
|
|
: "";
|
|
const applyRows = applyAfterText.split("\n").filter(Boolean).slice(applyBefore);
|
|
const outcomes: Record<string, number> = {};
|
|
for (const line of applyRows) {
|
|
try {
|
|
const o = JSON.parse(line);
|
|
outcomes[o.action ?? "?"] = (outcomes[o.action ?? "?"] ?? 0) + 1;
|
|
} catch { /* skip malformed */ }
|
|
}
|
|
|
|
const commitShas = await commitsSince(baseSha);
|
|
const buildStatus = commitShas.length > 0 ? (await cargoCheckGreen() ? "green" : "red") : "unknown";
|
|
|
|
const result: IterResult = {
|
|
iter,
|
|
scrum_reviews_added: reviewsAfter - reviewsBefore,
|
|
applier_outcomes: outcomes,
|
|
commits_landed: commitShas.length,
|
|
commit_shas: commitShas.map(s => s.split(" ")[0]),
|
|
build_status: buildStatus,
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
|
|
log(`iter ${iter} done — reviews+${result.scrum_reviews_added} commits=${result.commits_landed} build=${buildStatus} (${(result.duration_ms / 1000).toFixed(1)}s)`);
|
|
|
|
await postObserver({
|
|
source: "autonomous_loop",
|
|
loop_id: LOOP_ID,
|
|
event_kind: "iteration_complete",
|
|
iter,
|
|
targets: TARGETS,
|
|
success: buildStatus !== "red",
|
|
scrum_reviews_added: result.scrum_reviews_added,
|
|
applier_outcomes: result.applier_outcomes,
|
|
commits_landed: result.commits_landed,
|
|
commit_shas: result.commit_shas,
|
|
build_status: buildStatus,
|
|
duration_ms: result.duration_ms,
|
|
ts: new Date().toISOString(),
|
|
});
|
|
|
|
await appendFile(JOURNAL, JSON.stringify({ loop_id: LOOP_ID, ...result, ts: new Date().toISOString() }) + "\n");
|
|
|
|
return result;
|
|
}
|
|
|
|
async function main() {
|
|
log(`autonomous loop starting · branch=${BRANCH} max_iters=${MAX_ITERS} push=${PUSH}`);
|
|
log(`targets: ${TARGETS.join(", ")}`);
|
|
|
|
const branchR = await runCmd("git", ["branch", "--show-current"]);
|
|
if (branchR.stdout.trim() !== BRANCH) {
|
|
log(`ERROR: on branch ${branchR.stdout.trim()}, expected ${BRANCH}. Refusing to run.`);
|
|
process.exit(1);
|
|
}
|
|
|
|
let consecutiveZero = 0;
|
|
for (let iter = 1; iter <= MAX_ITERS; iter++) {
|
|
const baseSha = await gitHeadSha();
|
|
const result = await runIter(iter, baseSha);
|
|
|
|
if (PUSH && result.commits_landed > 0) {
|
|
log(`git push origin ${BRANCH}`);
|
|
const pushR = await runCmd("git", ["push", "origin", BRANCH]);
|
|
if (pushR.code !== 0) log(`push failed (continuing): ${pushR.stderr.slice(0, 200)}`);
|
|
}
|
|
|
|
consecutiveZero = result.commits_landed === 0 ? consecutiveZero + 1 : 0;
|
|
if (consecutiveZero >= 2) {
|
|
log(`STOP: 2 consecutive iters with 0 commits. Loop converged or stuck.`);
|
|
break;
|
|
}
|
|
}
|
|
|
|
log(`loop ${LOOP_ID} complete. Journal: ${JOURNAL}`);
|
|
}
|
|
|
|
main().catch((e) => {
|
|
log(`FATAL: ${e.message}`);
|
|
process.exit(1);
|
|
});
|