lakehouse/tests/multi-agent/memory_query.ts
root 52561d10d3 Input normalizer + unified memory query — "seamless with whatever input"
J asked directly: "did we implement our memory findings so that our
knowledge base and our configuration playbook [work] seamlessly with
whatever input they're given?" Honest answer tonight was "one of five
findings shipped, normalizer is the blocker." This closes that gap.

NORMALIZER (tests/multi-agent/normalize.ts):
Accepts structured JSON, natural language, or mixed. Returns canonical
NormalizedInput { role, city, state, count, client, deadline, intent,
confidence, extraction_method, missing_fields } for any downstream
consumer.

Three-tier path:
  1. Structured fast-path — already-shaped input skips LLM
  2. Regex path — "need 3 welders in Nashville, TN" parses without LLM.
     City/state parser tightened to 1-3 capitalized words + "in {city}"
     anchor preference + case-exact full-state-name variants to prevent
     "Forklift Operators in Chicago" being captured as the city name
  3. LLM fallback — qwen3 local with think:false + 400 max_tokens for
     inputs the regex can't handle

Unit tests (tests/multi-agent/normalize.test.ts): 9/9 pass. Covers
structured fast-path, misplacement→rescue intent, state-name→abbrev
conversion, regex extraction from natural language, plural role +
full state name edge case, rescue intent keyword precedence, partial
input reporting missing fields, empty object fallthrough, async/sync
parity on clean inputs.

UNIFIED MEMORY QUERY (tests/multi-agent/memory_query.ts):
One function, five parallel fan-outs, one bundle returned:
  - playbook_workers — hybrid_search via gateway with use_playbook_memory
  - pathway_recommendation — KB recommender for this sig
  - neighbor_signatures — K-NN sigs weighted by staffer competence
  - prior_lessons — T3 overseer lessons filtered by city/state
  - top_staffers — competence-sorted leaderboard
  - discovered_patterns — top workers endorsed across past playbooks
    for this (role, city, state)
  - latency_ms — per-source + total
Every branch is best-effort: one source down doesn't break the bundle.

HTTP ENDPOINT (mcp-server/index.ts):
  POST /memory/query with body {input: <anything>} → MemoryQueryResult
Returns the same shape the TS function does. Typed with types.ts for
future UI consumption.

VERIFIED:
  curl POST /memory/query with structured {role,city,state,count}
    → extraction_method=structured, 10 playbook workers, top score 0.878
  curl POST /memory/query with "I need 3 welders in Nashville, TN"
    → extraction_method=regex (no LLM call), 319ms total, 8 endorsements
      for Lauren Gomez auto-discovered as top Nashville Welder

Honest remaining gaps (documented for next phase):
  - Mem0 ADD/UPDATE/DELETE/NOOP — we still only ADD + mark_failed
  - Zep validity windows — playbook entries have timestamps but no
    retirement semantic
  - Letta working-memory / hot cache — every query scans all 1560
    playbook entries
  - Memory profiles / scoped queries — global pool, no per-staffer
    private subsets

2 of 5 findings now shipped (multi-strategy retrieval in Rust, input
normalization + unified query in TS). The remaining 3 are architectural
additions queued as Phase 25 items — validity windows first since it's
the most load-bearing for long-running systems.
2026-04-20 23:59:05 -05:00

240 lines
8.6 KiB
TypeScript

// Unified memory query — one surface that takes any input, normalizes
// it, and returns every memory signal the system has: playbook workers,
// KB pathway recommendations, prior lessons, staffer competence stats,
// and cross-staffer discovered patterns. This is the "seamlessly with
// whatever input" answer J framed it as.
//
// Why a unified gateway instead of separate calls: the memory surfaces
// are semantically related (playbook_memory workers reference runs the
// KB tracks; prior_lessons reference staffers the stats rank). Callers
// shouldn't have to know the topology.
import { readFile, readdir } from "node:fs/promises";
import { join } from "node:path";
import { normalizeInput, type NormalizedInput } from "./normalize.ts";
import { findNeighbors, loadRecommendation, loadStafferStats, computeSignature, type StafferStats, type PathwayRecommendation } from "./kb.ts";
import { GATEWAY, SIDECAR } from "./agent.ts";
export interface MemoryQueryResult {
input: NormalizedInput;
playbook_workers: Array<{
doc_id: string;
name: string;
score: number;
playbook_boost: number;
playbook_citations: string[];
}>;
pathway_recommendation: PathwayRecommendation | null;
neighbor_signatures: Array<{
sig_hash: string;
events_digest: string;
similarity: number;
weighted_score: number;
best_staffer_id: string | null;
}>;
prior_lessons: Array<{
date: string;
client: string;
cities: string;
lesson: string;
}>;
top_staffers: StafferStats[];
discovered_patterns: {
sig_hash: string;
top_workers: Array<{ name: string; endorsements: number }>;
} | null;
latency_ms: {
normalize: number;
playbook_search: number;
kb_neighbors: number;
staffer_stats: number;
prior_lessons: number;
total: number;
};
}
// Main entry. Normalizes the input, then fans out to every memory
// source in parallel. Each source is best-effort: a down dependency
// just leaves its field empty rather than breaking the query.
export async function queryMemory(raw: unknown): Promise<MemoryQueryResult> {
const t0 = Date.now();
// Normalize input first — sets the tone for every downstream call.
const tNorm = Date.now();
const input = await normalizeInput(raw);
const normalize_ms = Date.now() - tNorm;
// Synthesize a minimal scenario spec from normalized input for sig
// computation + KB lookup. Only needed if we have at least role + city.
const pseudoSpec = (input.role && input.city && input.state) ? {
client: input.client ?? "(unknown)",
events: [{
kind: input.intent === "rescue" ? "misplacement" : "baseline_fill",
role: input.role,
count: input.count ?? 1,
city: input.city,
state: input.state,
}],
} : null;
// Fire everything that CAN fire in parallel.
const tPb = Date.now();
const playbookPromise: Promise<MemoryQueryResult["playbook_workers"]> =
(input.role && input.city && input.state)
? fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
index_name: "workers_500k_v1",
sql_filter: `role = '${input.role.replace(/'/g, "''")}' AND city = '${input.city.replace(/'/g, "''")}' AND state = '${input.state}' AND CAST(availability AS DOUBLE) > 0.5`,
question: `${input.role} ${input.city} ${input.state}`,
top_k: 10,
generate: false,
use_playbook_memory: true,
playbook_memory_k: 100,
}),
}).then(r => r.ok ? r.json() : { sources: [] })
.then((d: any) => (d.sources ?? []).map((s: any) => ({
doc_id: s.doc_id,
name: s.chunk_text?.split("—")[0]?.trim() ?? "?",
score: s.score ?? 0,
playbook_boost: s.playbook_boost ?? 0,
playbook_citations: s.playbook_citations ?? [],
})))
.catch(() => [])
: Promise.resolve([]);
const tKb = Date.now();
const recPromise = pseudoSpec
? loadRecommendation(pseudoSpec).catch(() => null)
: Promise.resolve(null);
const neighborsPromise = pseudoSpec
? findNeighbors(pseudoSpec, 5).catch(() => [])
: Promise.resolve([]);
const tLess = Date.now();
const lessonsPromise: Promise<MemoryQueryResult["prior_lessons"]> =
(input.city && input.state)
? loadRelevantLessons(input.city, input.state).catch(() => [])
: Promise.resolve([]);
const tStaff = Date.now();
const staffersPromise = loadStafferStats().catch(() => []);
// Parallel await
const [playbook_workers, pathway_recommendation, neighbors_raw, prior_lessons, staffers] =
await Promise.all([playbookPromise, recPromise, neighborsPromise, lessonsPromise, staffersPromise]);
// Derived: top-k staffers by competence, from recent activity.
const top_staffers = staffers
.filter(s => s.total_runs > 0)
.sort((a, b) => b.competence_score - a.competence_score)
.slice(0, 5);
// Derived: discovered patterns for this sig (workers endorsed ≥ 2 staffers).
let discovered_patterns: MemoryQueryResult["discovered_patterns"] = null;
if (pseudoSpec) {
const sig_hash = computeSignature(pseudoSpec);
const workers = await collectTopEndorsedWorkers(sig_hash, input.role, input.city, input.state).catch(() => []);
if (workers.length > 0) discovered_patterns = { sig_hash, top_workers: workers };
}
return {
input,
playbook_workers,
pathway_recommendation,
neighbor_signatures: neighbors_raw.map(n => ({
sig_hash: n.sig.sig_hash,
events_digest: n.sig.events_digest,
similarity: n.similarity,
weighted_score: n.weighted_score,
best_staffer_id: n.best_staffer_id,
})),
prior_lessons,
top_staffers,
discovered_patterns,
latency_ms: {
normalize: normalize_ms,
playbook_search: Date.now() - tPb,
kb_neighbors: Date.now() - tKb,
staffer_stats: Date.now() - tStaff,
prior_lessons: Date.now() - tLess,
total: Date.now() - t0,
},
};
}
async function loadRelevantLessons(city: string, state: string | null): Promise<MemoryQueryResult["prior_lessons"]> {
try {
const dir = join("data", "_playbook_lessons");
const files = await readdir(dir).catch(() => [] as string[]);
const out: MemoryQueryResult["prior_lessons"] = [];
for (const f of files) {
if (!f.endsWith(".json")) continue;
try {
const raw = await readFile(join(dir, f), "utf8");
const rec = JSON.parse(raw);
const lessonCities = (rec.cities ?? "").split(",");
const lessonStates = (rec.states ?? "").split(",");
if (lessonCities.includes(city) || (state && lessonStates.includes(state))) {
out.push({
date: rec.date,
client: rec.client,
cities: rec.cities,
lesson: rec.lesson.slice(0, 500),
});
}
} catch { /* skip malformed */ }
}
out.sort((a, b) => (b.date ?? "").localeCompare(a.date ?? ""));
return out.slice(0, 3);
} catch {
return [];
}
}
async function collectTopEndorsedWorkers(
sig_hash: string,
role: string | null,
city: string | null,
state: string | null,
): Promise<Array<{ name: string; endorsements: number }>> {
if (!role || !city || !state) return [];
// Pull from playbook_memory — each successful fill was endorsed;
// count by (name, role, city, state) across past playbooks.
try {
const resp = await fetch(`${GATEWAY}/query/sql`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
sql: `SELECT operation, result FROM successful_playbooks_live
WHERE operation LIKE 'fill: ${role.replace(/'/g, "''")} %'
AND operation LIKE '%in ${city.replace(/'/g, "''")}, ${state}%'
ORDER BY timestamp DESC LIMIT 50`,
}),
});
if (!resp.ok) return [];
const data: any = await resp.json();
// successful_playbooks_live.result has the shape
// "N/N filled → Name A, Name B, Name C"
// or historical "Name A | Name B | Name C". Handle both.
const counts = new Map<string, number>();
for (const row of data.rows ?? []) {
const raw = String(row.result ?? "");
const afterArrow = raw.includes("→") ? raw.split("→")[1] : raw;
const names = afterArrow
.split(/[,|]/)
.map((n: string) => n.trim())
.filter(n => n && !/^\d+\/\d+/.test(n) && !n.toLowerCase().startsWith("filled"));
for (const n of names) counts.set(n, (counts.get(n) ?? 0) + 1);
}
return [...counts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 5)
.map(([name, endorsements]) => ({ name, endorsements }));
} catch {
return [];
}
}