Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
260 lines
11 KiB
TypeScript
260 lines
11 KiB
TypeScript
#!/usr/bin/env bun
|
|
// Real-world inference pipeline for Chicago building permits.
|
|
// Uses the unified matrix retriever (chicago_permits + entity_brief +
|
|
// sec_tickers + llm_team_runs + distilled_procedural) to enrich a
|
|
// Grok 4.1 fast analysis. Observer hand-reviews each result.
|
|
//
|
|
// First true USE of the matrix architecture on real ingested data —
|
|
// not the scrum self-improvement loop, the staffing intelligence loop.
|
|
//
|
|
// Usage:
|
|
// bun run scripts/analyze_chicago_contracts.ts [N]
|
|
// N = number of permits to analyze (default 5)
|
|
|
|
const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100";
|
|
const OBSERVER = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
|
|
const RAW_BUCKET = "raw";
|
|
const MC_ALIAS = "local";
|
|
const STAGE_DIR = "/tmp/chicago_analyze";
|
|
const OUTPUT = "/home/profit/lakehouse/data/_kb/contract_analyses.jsonl";
|
|
|
|
const CONTRACT_CORPORA = [
|
|
"chicago_permits_v1",
|
|
"entity_brief_v1",
|
|
"sec_tickers_v1",
|
|
"llm_team_runs_v1",
|
|
"llm_team_response_cache_v1",
|
|
"distilled_procedural_v20260423102847",
|
|
];
|
|
|
|
interface Permit {
|
|
permit_?: string;
|
|
permit_type?: string;
|
|
permit_status?: string;
|
|
work_description?: string;
|
|
reported_cost?: string | number;
|
|
contact_1?: any;
|
|
contact_2?: any;
|
|
contact_3_name?: string;
|
|
street_number?: string;
|
|
street_direction?: string;
|
|
street_name?: string;
|
|
suffix?: string;
|
|
issue_date?: string;
|
|
community_area?: string;
|
|
ward?: string;
|
|
[k: string]: any;
|
|
}
|
|
|
|
interface MatrixHit {
|
|
source_corpus: string;
|
|
score: number;
|
|
doc_id: string;
|
|
text: string;
|
|
}
|
|
|
|
function log(msg: string) { console.log(`[contract ${new Date().toISOString().slice(11,19)}] ${msg}`); }
|
|
|
|
async function fetchPermits(n: number): Promise<Permit[]> {
|
|
const fs = await import("node:fs/promises");
|
|
await fs.mkdir(STAGE_DIR, { recursive: true });
|
|
const local = `${STAGE_DIR}/permits.json`;
|
|
const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/chicago/permits_2026-04-25.json`, local]);
|
|
await proc.exited;
|
|
const all: Permit[] = JSON.parse(await Bun.file(local).text());
|
|
// Pick high-cost permits with named contractors — most interesting for staffing analysis.
|
|
// Field is `contact_1_name`, not `contact_1`. reported_cost is integer-like string.
|
|
const meaningful = all.filter(p =>
|
|
p.reported_cost && Number(p.reported_cost) >= 100000 &&
|
|
(p.contact_1_name || p.contact_2_name)
|
|
);
|
|
log(`raw permits: ${all.length} · meaningful (cost >= $100k + has contractor): ${meaningful.length}`);
|
|
// Sample evenly across the meaningful set
|
|
const sampled: Permit[] = [];
|
|
const stride = Math.max(1, Math.floor(meaningful.length / n));
|
|
for (let i = 0; i < meaningful.length && sampled.length < n; i += stride) {
|
|
sampled.push(meaningful[i]);
|
|
}
|
|
return sampled;
|
|
}
|
|
|
|
function permitToText(p: Permit): string {
|
|
const addr = `${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.replace(/\s+/g, " ").trim();
|
|
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
|
|
const c2 = (p as any).contact_2_name ?? (typeof p.contact_2 === "string" ? p.contact_2 : (p.contact_2?.name ?? ""));
|
|
return [
|
|
`Chicago Building Permit ${p.permit_ ?? "?"}`,
|
|
`Type: ${p.permit_type ?? "?"} · Status: ${p.permit_status ?? "?"}`,
|
|
`Address: ${addr} · Community ${p.community_area ?? "?"} · Ward ${p.ward ?? "?"}`,
|
|
`Issued: ${p.issue_date ?? "?"}`,
|
|
`Reported cost: $${Number(p.reported_cost ?? 0).toLocaleString()}`,
|
|
`Primary contractor: ${c1 || "unknown"}`,
|
|
c2 ? `Secondary: ${c2}` : "",
|
|
`Owner: ${p.contact_3_name ?? "?"}`,
|
|
`Work description: ${(p.work_description ?? "").slice(0, 800)}`,
|
|
].filter(Boolean).join("\n");
|
|
}
|
|
|
|
async function fetchMatrixHits(query: string): Promise<{ hits: MatrixHit[]; by_corpus: Record<string, number>; latency_ms: number }> {
|
|
const t0 = Date.now();
|
|
const all: MatrixHit[] = [];
|
|
const byCorpus: Record<string, number> = {};
|
|
await Promise.all(CONTRACT_CORPORA.map(async (idx) => {
|
|
try {
|
|
const r = await fetch(`${GATEWAY}/vectors/search`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({ index_name: idx, query, top_k: 3 }),
|
|
signal: AbortSignal.timeout(15000),
|
|
});
|
|
if (!r.ok) { byCorpus[idx] = -1; return; }
|
|
const data: any = await r.json();
|
|
const results = data.results ?? [];
|
|
byCorpus[idx] = results.length;
|
|
for (const h of results) {
|
|
all.push({
|
|
source_corpus: idx,
|
|
score: Number(h.score ?? 0),
|
|
doc_id: String(h.doc_id ?? "?"),
|
|
text: String(h.chunk_text ?? "").slice(0, 400),
|
|
});
|
|
}
|
|
} catch { byCorpus[idx] = -1; }
|
|
}));
|
|
all.sort((a, b) => b.score - a.score);
|
|
return { hits: all.slice(0, 10), by_corpus: byCorpus, latency_ms: Date.now() - t0 };
|
|
}
|
|
|
|
function buildMatrixPreamble(hits: MatrixHit[]): string {
|
|
if (hits.length === 0) return "";
|
|
const lines = [
|
|
`═══ 📖 MATRIX CONTEXT — ${hits.length} relevant hits across the knowledge base ═══`,
|
|
"Reference material from prior contractor data, SEC tickers, LLM team analyses, and distilled procedures. Use as evidence; do NOT invent.",
|
|
"",
|
|
];
|
|
for (let i = 0; i < hits.length; i++) {
|
|
const h = hits[i];
|
|
lines.push(`[${i + 1}] ${h.source_corpus} (score=${h.score.toFixed(2)}, doc=${h.doc_id}): ${h.text.replace(/\s+/g, " ").trim()}`);
|
|
}
|
|
lines.push("═══");
|
|
lines.push("");
|
|
return lines.join("\n");
|
|
}
|
|
|
|
async function chat(model: string, prompt: string): Promise<{ content: string; error?: string }> {
|
|
try {
|
|
const r = await fetch(`${GATEWAY}/v1/chat`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
provider: "openrouter",
|
|
model,
|
|
messages: [{ role: "user", content: prompt }],
|
|
max_tokens: 1500,
|
|
temperature: 0.1,
|
|
}),
|
|
signal: AbortSignal.timeout(90000),
|
|
});
|
|
if (!r.ok) return { content: "", error: `HTTP ${r.status}: ${(await r.text()).slice(0, 200)}` };
|
|
const j: any = await r.json();
|
|
return { content: j.choices?.[0]?.message?.content ?? "" };
|
|
} catch (e: any) { return { content: "", error: e.message }; }
|
|
}
|
|
|
|
async function observerReview(input: { permit_id: string; model: string; response: string; permit_text: string }): Promise<{ verdict: string; confidence: number; notes: string; source: string }> {
|
|
try {
|
|
const r = await fetch(`${OBSERVER}/review`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
file_path: `chicago_permit/${input.permit_id}`,
|
|
model: input.model,
|
|
response: input.response,
|
|
source_content: input.permit_text,
|
|
grounding_stats: { total: 0, grounded: 0, groundedPct: null },
|
|
attempt: 1,
|
|
}),
|
|
signal: AbortSignal.timeout(90000),
|
|
});
|
|
if (!r.ok) return { verdict: "accept", confidence: 50, notes: `observer ${r.status}`, source: "fallthrough" };
|
|
return await r.json();
|
|
} catch (e: any) { return { verdict: "accept", confidence: 50, notes: `observer error: ${e.message}`, source: "fallthrough" }; }
|
|
}
|
|
|
|
async function analyzeOne(p: Permit, idx: number, total: number): Promise<any> {
|
|
const permit_id = p.permit_ ?? `unknown_${idx}`;
|
|
const t0 = Date.now();
|
|
log(`══ permit ${idx + 1}/${total} · ${permit_id} · type=${p.permit_type} · cost=$${Number(p.reported_cost ?? 0).toLocaleString()}`);
|
|
const permitText = permitToText(p);
|
|
|
|
// Build matrix query: combine type + work description + contractor name for retrieval anchoring
|
|
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
|
|
const matrixQuery = `${p.permit_type ?? ""} ${(p.work_description ?? "").slice(0, 300)} ${c1}`;
|
|
const matrix = await fetchMatrixHits(matrixQuery);
|
|
const corporaSummary = Object.entries(matrix.by_corpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
|
|
log(` 📖 matrix: ${matrix.hits.length} hits in ${matrix.latency_ms}ms · ${corporaSummary}`);
|
|
|
|
const preamble = buildMatrixPreamble(matrix.hits);
|
|
const task = `${preamble}You are a staffing-intelligence analyst reviewing a real Chicago building permit. Using the MATRIX CONTEXT above as evidence, produce a structured analysis:
|
|
|
|
PERMIT:
|
|
${permitText}
|
|
|
|
Produce a markdown analysis with:
|
|
1. **Permit summary** — 2 sentences on what this is
|
|
2. **Contractor signal** — what we know about the named contractor(s) from matrix context (cite [N] hits). If unknown, say so.
|
|
3. **Staffing fit** — what trades/headcount/skills this permit implies
|
|
4. **Risk flags** — anything in matrix context that suggests caution (debarment, prior incidents, low-quality history). If none, say so.
|
|
5. **Opportunity score** — 0-100 with one-sentence rationale
|
|
|
|
Cite matrix hits as [N] inline. If matrix has no relevant hits, say "no matrix evidence" — do NOT invent contractor history.`;
|
|
|
|
const resp = await chat("x-ai/grok-4.1-fast", task);
|
|
if (resp.error) {
|
|
log(` ✗ chat error: ${resp.error.slice(0, 100)}`);
|
|
return { permit_id, ok: false, error: resp.error, ts: new Date().toISOString() };
|
|
}
|
|
log(` ✓ analysis ${resp.content.length} chars`);
|
|
|
|
const verdict = await observerReview({ permit_id, model: "openrouter/x-ai/grok-4.1-fast", response: resp.content, permit_text: permitText });
|
|
log(` observer: ${verdict.verdict} (conf=${verdict.confidence}, src=${verdict.source})`);
|
|
|
|
return {
|
|
permit_id, ok: true,
|
|
permit_type: p.permit_type, cost: Number(p.reported_cost ?? 0),
|
|
contractor: c1, matrix_hits: matrix.hits.length, matrix_corpora: matrix.by_corpus, matrix_ms: matrix.latency_ms,
|
|
analysis: resp.content,
|
|
observer_verdict: verdict.verdict, observer_conf: verdict.confidence, observer_notes: verdict.notes, observer_src: verdict.source,
|
|
duration_ms: Date.now() - t0, ts: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
async function main() {
|
|
const n = Number(process.argv[2] ?? 5);
|
|
log(`fetching ${n} permits from raw bucket...`);
|
|
const permits = await fetchPermits(n);
|
|
log(`analyzing ${permits.length} permits sequentially...`);
|
|
|
|
const fs = await import("node:fs/promises");
|
|
const { appendFile } = fs;
|
|
const results: any[] = [];
|
|
for (let i = 0; i < permits.length; i++) {
|
|
const r = await analyzeOne(permits[i], i, permits.length);
|
|
results.push(r);
|
|
await appendFile(OUTPUT, JSON.stringify(r) + "\n");
|
|
}
|
|
|
|
log(`\n══ SUMMARY ══`);
|
|
const ok = results.filter(r => r.ok).length;
|
|
const accepted = results.filter(r => r.observer_verdict === "accept").length;
|
|
const cycled = results.filter(r => r.observer_verdict === "cycle").length;
|
|
const rejected = results.filter(r => r.observer_verdict === "reject").length;
|
|
const avgHits = results.reduce((a, r) => a + (r.matrix_hits ?? 0), 0) / Math.max(1, results.length);
|
|
log(` permits analyzed: ${ok}/${results.length}`);
|
|
log(` observer: accept=${accepted} cycle=${cycled} reject=${rejected}`);
|
|
log(` avg matrix hits per permit: ${avgHits.toFixed(1)}`);
|
|
log(` output: ${OUTPUT}`);
|
|
}
|
|
|
|
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });
|