profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

240 lines
8.6 KiB
TypeScript

// Unified memory query — one surface that takes any input, normalizes
// it, and returns every memory signal the system has: playbook workers,
// KB pathway recommendations, prior lessons, staffer competence stats,
// and cross-staffer discovered patterns. This is the "seamlessly with
// whatever input" answer J framed it as.
//
// Why a unified gateway instead of separate calls: the memory surfaces
// are semantically related (playbook_memory workers reference runs the
// KB tracks; prior_lessons reference staffers the stats rank). Callers
// shouldn't have to know the topology.
import { readFile, readdir } from "node:fs/promises";
import { join } from "node:path";
import { normalizeInput, type NormalizedInput } from "./normalize.ts";
import { findNeighbors, loadRecommendation, loadStafferStats, computeSignature, type StafferStats, type PathwayRecommendation } from "./kb.ts";
import { GATEWAY, SIDECAR } from "./agent.ts";
export interface MemoryQueryResult {
input: NormalizedInput;
playbook_workers: Array<{
doc_id: string;
name: string;
score: number;
playbook_boost: number;
playbook_citations: string[];
}>;
pathway_recommendation: PathwayRecommendation | null;
neighbor_signatures: Array<{
sig_hash: string;
events_digest: string;
similarity: number;
weighted_score: number;
best_staffer_id: string | null;
}>;
prior_lessons: Array<{
date: string;
client: string;
cities: string;
lesson: string;
}>;
top_staffers: StafferStats[];
discovered_patterns: {
sig_hash: string;
top_workers: Array<{ name: string; endorsements: number }>;
} | null;
latency_ms: {
normalize: number;
playbook_search: number;
kb_neighbors: number;
staffer_stats: number;
prior_lessons: number;
total: number;
};
}
// Main entry. Normalizes the input, then fans out to every memory
// source in parallel. Each source is best-effort: a down dependency
// just leaves its field empty rather than breaking the query.
export async function queryMemory(raw: unknown): Promise<MemoryQueryResult> {
const t0 = Date.now();
// Normalize input first — sets the tone for every downstream call.
const tNorm = Date.now();
const input = await normalizeInput(raw);
const normalize_ms = Date.now() - tNorm;
// Synthesize a minimal scenario spec from normalized input for sig
// computation + KB lookup. Only needed if we have at least role + city.
const pseudoSpec = (input.role && input.city && input.state) ? {
client: input.client ?? "(unknown)",
events: [{
kind: input.intent === "rescue" ? "misplacement" : "baseline_fill",
role: input.role,
count: input.count ?? 1,
city: input.city,
state: input.state,
}],
} : null;
// Fire everything that CAN fire in parallel.
const tPb = Date.now();
const playbookPromise: Promise<MemoryQueryResult["playbook_workers"]> =
(input.role && input.city && input.state)
? fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
index_name: "workers_500k_v1",
sql_filter: `role = '${input.role.replace(/'/g, "''")}' AND city = '${input.city.replace(/'/g, "''")}' AND state = '${input.state}' AND CAST(availability AS DOUBLE) > 0.5`,
question: `${input.role} ${input.city} ${input.state}`,
top_k: 10,
generate: false,
use_playbook_memory: true,
playbook_memory_k: 100,
}),
}).then(r => r.ok ? r.json() : { sources: [] })
.then((d: any) => (d.sources ?? []).map((s: any) => ({
doc_id: s.doc_id,
name: s.chunk_text?.split("—")[0]?.trim() ?? "?",
score: s.score ?? 0,
playbook_boost: s.playbook_boost ?? 0,
playbook_citations: s.playbook_citations ?? [],
})))
.catch(() => [])
: Promise.resolve([]);
const tKb = Date.now();
const recPromise = pseudoSpec
? loadRecommendation(pseudoSpec).catch(() => null)
: Promise.resolve(null);
const neighborsPromise = pseudoSpec
? findNeighbors(pseudoSpec, 5).catch(() => [])
: Promise.resolve([]);
const tLess = Date.now();
const lessonsPromise: Promise<MemoryQueryResult["prior_lessons"]> =
(input.city && input.state)
? loadRelevantLessons(input.city, input.state).catch(() => [])
: Promise.resolve([]);
const tStaff = Date.now();
const staffersPromise = loadStafferStats().catch(() => []);
// Parallel await
const [playbook_workers, pathway_recommendation, neighbors_raw, prior_lessons, staffers] =
await Promise.all([playbookPromise, recPromise, neighborsPromise, lessonsPromise, staffersPromise]);
// Derived: top-k staffers by competence, from recent activity.
const top_staffers = staffers
.filter(s => s.total_runs > 0)
.sort((a, b) => b.competence_score - a.competence_score)
.slice(0, 5);
// Derived: discovered patterns for this sig (workers endorsed ≥ 2 staffers).
let discovered_patterns: MemoryQueryResult["discovered_patterns"] = null;
if (pseudoSpec) {
const sig_hash = computeSignature(pseudoSpec);
const workers = await collectTopEndorsedWorkers(sig_hash, input.role, input.city, input.state).catch(() => []);
if (workers.length > 0) discovered_patterns = { sig_hash, top_workers: workers };
}
return {
input,
playbook_workers,
pathway_recommendation,
neighbor_signatures: neighbors_raw.map(n => ({
sig_hash: n.sig.sig_hash,
events_digest: n.sig.events_digest,
similarity: n.similarity,
weighted_score: n.weighted_score,
best_staffer_id: n.best_staffer_id,
})),
prior_lessons,
top_staffers,
discovered_patterns,
latency_ms: {
normalize: normalize_ms,
playbook_search: Date.now() - tPb,
kb_neighbors: Date.now() - tKb,
staffer_stats: Date.now() - tStaff,
prior_lessons: Date.now() - tLess,
total: Date.now() - t0,
},
};
}
async function loadRelevantLessons(city: string, state: string | null): Promise<MemoryQueryResult["prior_lessons"]> {
try {
const dir = join("data", "_playbook_lessons");
const files = await readdir(dir).catch(() => [] as string[]);
const out: MemoryQueryResult["prior_lessons"] = [];
for (const f of files) {
if (!f.endsWith(".json")) continue;
try {
const raw = await readFile(join(dir, f), "utf8");
const rec = JSON.parse(raw);
const lessonCities = (rec.cities ?? "").split(",");
const lessonStates = (rec.states ?? "").split(",");
if (lessonCities.includes(city) || (state && lessonStates.includes(state))) {
out.push({
date: rec.date,
client: rec.client,
cities: rec.cities,
lesson: rec.lesson.slice(0, 500),
});
}
} catch { /* skip malformed */ }
}
out.sort((a, b) => (b.date ?? "").localeCompare(a.date ?? ""));
return out.slice(0, 3);
} catch {
return [];
}
}
async function collectTopEndorsedWorkers(
sig_hash: string,
role: string | null,
city: string | null,
state: string | null,
): Promise<Array<{ name: string; endorsements: number }>> {
if (!role || !city || !state) return [];
// Pull from playbook_memory — each successful fill was endorsed;
// count by (name, role, city, state) across past playbooks.
try {
const resp = await fetch(`${GATEWAY}/query/sql`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
sql: `SELECT operation, result FROM successful_playbooks_live
WHERE operation LIKE 'fill: ${role.replace(/'/g, "''")} %'
AND operation LIKE '%in ${city.replace(/'/g, "''")}, ${state}%'
ORDER BY timestamp DESC LIMIT 50`,
}),
});
if (!resp.ok) return [];
const data: any = await resp.json();
// successful_playbooks_live.result has the shape
// "N/N filled → Name A, Name B, Name C"
// or historical "Name A | Name B | Name C". Handle both.
const counts = new Map<string, number>();
for (const row of data.rows ?? []) {
const raw = String(row.result ?? "");
const afterArrow = raw.includes("→") ? raw.split("→")[1] : raw;
const names = afterArrow
.split(/[,|]/)
.map((n: string) => n.trim())
.filter(n => n && !/^\d+\/\d+/.test(n) && !n.toLowerCase().startsWith("filled"));
for (const n of names) counts.set(n, (counts.get(n) ?? 0) + 1);
}
return [...counts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 5)
.map(([name, endorsements]) => ({ name, endorsements }));
} catch {
return [];
}
}