Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
240 lines
8.6 KiB
TypeScript
240 lines
8.6 KiB
TypeScript
// Unified memory query — one surface that takes any input, normalizes
|
|
// it, and returns every memory signal the system has: playbook workers,
|
|
// KB pathway recommendations, prior lessons, staffer competence stats,
|
|
// and cross-staffer discovered patterns. This is the "seamlessly with
|
|
// whatever input" answer J framed it as.
|
|
//
|
|
// Why a unified gateway instead of separate calls: the memory surfaces
|
|
// are semantically related (playbook_memory workers reference runs the
|
|
// KB tracks; prior_lessons reference staffers the stats rank). Callers
|
|
// shouldn't have to know the topology.
|
|
|
|
import { readFile, readdir } from "node:fs/promises";
|
|
import { join } from "node:path";
|
|
import { normalizeInput, type NormalizedInput } from "./normalize.ts";
|
|
import { findNeighbors, loadRecommendation, loadStafferStats, computeSignature, type StafferStats, type PathwayRecommendation } from "./kb.ts";
|
|
import { GATEWAY, SIDECAR } from "./agent.ts";
|
|
|
|
export interface MemoryQueryResult {
|
|
input: NormalizedInput;
|
|
playbook_workers: Array<{
|
|
doc_id: string;
|
|
name: string;
|
|
score: number;
|
|
playbook_boost: number;
|
|
playbook_citations: string[];
|
|
}>;
|
|
pathway_recommendation: PathwayRecommendation | null;
|
|
neighbor_signatures: Array<{
|
|
sig_hash: string;
|
|
events_digest: string;
|
|
similarity: number;
|
|
weighted_score: number;
|
|
best_staffer_id: string | null;
|
|
}>;
|
|
prior_lessons: Array<{
|
|
date: string;
|
|
client: string;
|
|
cities: string;
|
|
lesson: string;
|
|
}>;
|
|
top_staffers: StafferStats[];
|
|
discovered_patterns: {
|
|
sig_hash: string;
|
|
top_workers: Array<{ name: string; endorsements: number }>;
|
|
} | null;
|
|
latency_ms: {
|
|
normalize: number;
|
|
playbook_search: number;
|
|
kb_neighbors: number;
|
|
staffer_stats: number;
|
|
prior_lessons: number;
|
|
total: number;
|
|
};
|
|
}
|
|
|
|
// Main entry. Normalizes the input, then fans out to every memory
|
|
// source in parallel. Each source is best-effort: a down dependency
|
|
// just leaves its field empty rather than breaking the query.
|
|
export async function queryMemory(raw: unknown): Promise<MemoryQueryResult> {
|
|
const t0 = Date.now();
|
|
|
|
// Normalize input first — sets the tone for every downstream call.
|
|
const tNorm = Date.now();
|
|
const input = await normalizeInput(raw);
|
|
const normalize_ms = Date.now() - tNorm;
|
|
|
|
// Synthesize a minimal scenario spec from normalized input for sig
|
|
// computation + KB lookup. Only needed if we have at least role + city.
|
|
const pseudoSpec = (input.role && input.city && input.state) ? {
|
|
client: input.client ?? "(unknown)",
|
|
events: [{
|
|
kind: input.intent === "rescue" ? "misplacement" : "baseline_fill",
|
|
role: input.role,
|
|
count: input.count ?? 1,
|
|
city: input.city,
|
|
state: input.state,
|
|
}],
|
|
} : null;
|
|
|
|
// Fire everything that CAN fire in parallel.
|
|
const tPb = Date.now();
|
|
const playbookPromise: Promise<MemoryQueryResult["playbook_workers"]> =
|
|
(input.role && input.city && input.state)
|
|
? fetch(`${GATEWAY}/vectors/hybrid`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
index_name: "workers_500k_v1",
|
|
sql_filter: `role = '${input.role.replace(/'/g, "''")}' AND city = '${input.city.replace(/'/g, "''")}' AND state = '${input.state}' AND CAST(availability AS DOUBLE) > 0.5`,
|
|
question: `${input.role} ${input.city} ${input.state}`,
|
|
top_k: 10,
|
|
generate: false,
|
|
use_playbook_memory: true,
|
|
playbook_memory_k: 100,
|
|
}),
|
|
}).then(r => r.ok ? r.json() : { sources: [] })
|
|
.then((d: any) => (d.sources ?? []).map((s: any) => ({
|
|
doc_id: s.doc_id,
|
|
name: s.chunk_text?.split("—")[0]?.trim() ?? "?",
|
|
score: s.score ?? 0,
|
|
playbook_boost: s.playbook_boost ?? 0,
|
|
playbook_citations: s.playbook_citations ?? [],
|
|
})))
|
|
.catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tKb = Date.now();
|
|
const recPromise = pseudoSpec
|
|
? loadRecommendation(pseudoSpec).catch(() => null)
|
|
: Promise.resolve(null);
|
|
|
|
const neighborsPromise = pseudoSpec
|
|
? findNeighbors(pseudoSpec, 5).catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tLess = Date.now();
|
|
const lessonsPromise: Promise<MemoryQueryResult["prior_lessons"]> =
|
|
(input.city && input.state)
|
|
? loadRelevantLessons(input.city, input.state).catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tStaff = Date.now();
|
|
const staffersPromise = loadStafferStats().catch(() => []);
|
|
|
|
// Parallel await
|
|
const [playbook_workers, pathway_recommendation, neighbors_raw, prior_lessons, staffers] =
|
|
await Promise.all([playbookPromise, recPromise, neighborsPromise, lessonsPromise, staffersPromise]);
|
|
|
|
// Derived: top-k staffers by competence, from recent activity.
|
|
const top_staffers = staffers
|
|
.filter(s => s.total_runs > 0)
|
|
.sort((a, b) => b.competence_score - a.competence_score)
|
|
.slice(0, 5);
|
|
|
|
// Derived: discovered patterns for this sig (workers endorsed ≥ 2 staffers).
|
|
let discovered_patterns: MemoryQueryResult["discovered_patterns"] = null;
|
|
if (pseudoSpec) {
|
|
const sig_hash = computeSignature(pseudoSpec);
|
|
const workers = await collectTopEndorsedWorkers(sig_hash, input.role, input.city, input.state).catch(() => []);
|
|
if (workers.length > 0) discovered_patterns = { sig_hash, top_workers: workers };
|
|
}
|
|
|
|
return {
|
|
input,
|
|
playbook_workers,
|
|
pathway_recommendation,
|
|
neighbor_signatures: neighbors_raw.map(n => ({
|
|
sig_hash: n.sig.sig_hash,
|
|
events_digest: n.sig.events_digest,
|
|
similarity: n.similarity,
|
|
weighted_score: n.weighted_score,
|
|
best_staffer_id: n.best_staffer_id,
|
|
})),
|
|
prior_lessons,
|
|
top_staffers,
|
|
discovered_patterns,
|
|
latency_ms: {
|
|
normalize: normalize_ms,
|
|
playbook_search: Date.now() - tPb,
|
|
kb_neighbors: Date.now() - tKb,
|
|
staffer_stats: Date.now() - tStaff,
|
|
prior_lessons: Date.now() - tLess,
|
|
total: Date.now() - t0,
|
|
},
|
|
};
|
|
}
|
|
|
|
async function loadRelevantLessons(city: string, state: string | null): Promise<MemoryQueryResult["prior_lessons"]> {
|
|
try {
|
|
const dir = join("data", "_playbook_lessons");
|
|
const files = await readdir(dir).catch(() => [] as string[]);
|
|
const out: MemoryQueryResult["prior_lessons"] = [];
|
|
for (const f of files) {
|
|
if (!f.endsWith(".json")) continue;
|
|
try {
|
|
const raw = await readFile(join(dir, f), "utf8");
|
|
const rec = JSON.parse(raw);
|
|
const lessonCities = (rec.cities ?? "").split(",");
|
|
const lessonStates = (rec.states ?? "").split(",");
|
|
if (lessonCities.includes(city) || (state && lessonStates.includes(state))) {
|
|
out.push({
|
|
date: rec.date,
|
|
client: rec.client,
|
|
cities: rec.cities,
|
|
lesson: rec.lesson.slice(0, 500),
|
|
});
|
|
}
|
|
} catch { /* skip malformed */ }
|
|
}
|
|
out.sort((a, b) => (b.date ?? "").localeCompare(a.date ?? ""));
|
|
return out.slice(0, 3);
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async function collectTopEndorsedWorkers(
|
|
sig_hash: string,
|
|
role: string | null,
|
|
city: string | null,
|
|
state: string | null,
|
|
): Promise<Array<{ name: string; endorsements: number }>> {
|
|
if (!role || !city || !state) return [];
|
|
// Pull from playbook_memory — each successful fill was endorsed;
|
|
// count by (name, role, city, state) across past playbooks.
|
|
try {
|
|
const resp = await fetch(`${GATEWAY}/query/sql`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
sql: `SELECT operation, result FROM successful_playbooks_live
|
|
WHERE operation LIKE 'fill: ${role.replace(/'/g, "''")} %'
|
|
AND operation LIKE '%in ${city.replace(/'/g, "''")}, ${state}%'
|
|
ORDER BY timestamp DESC LIMIT 50`,
|
|
}),
|
|
});
|
|
if (!resp.ok) return [];
|
|
const data: any = await resp.json();
|
|
// successful_playbooks_live.result has the shape
|
|
// "N/N filled → Name A, Name B, Name C"
|
|
// or historical "Name A | Name B | Name C". Handle both.
|
|
const counts = new Map<string, number>();
|
|
for (const row of data.rows ?? []) {
|
|
const raw = String(row.result ?? "");
|
|
const afterArrow = raw.includes("→") ? raw.split("→")[1] : raw;
|
|
const names = afterArrow
|
|
.split(/[,|]/)
|
|
.map((n: string) => n.trim())
|
|
.filter(n => n && !/^\d+\/\d+/.test(n) && !n.toLowerCase().startsWith("filled"));
|
|
for (const n of names) counts.set(n, (counts.get(n) ?? 0) + 1);
|
|
}
|
|
return [...counts.entries()]
|
|
.sort((a, b) => b[1] - a[1])
|
|
.slice(0, 5)
|
|
.map(([name, endorsements]) => ({ name, endorsements }));
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|