J asked directly: "did we implement our memory findings so that our
knowledge base and our configuration playbook [work] seamlessly with
whatever input they're given?" Honest answer tonight was "one of five
findings shipped, normalizer is the blocker." This closes that gap.
NORMALIZER (tests/multi-agent/normalize.ts):
Accepts structured JSON, natural language, or mixed. Returns canonical
NormalizedInput { role, city, state, count, client, deadline, intent,
confidence, extraction_method, missing_fields } for any downstream
consumer.
Three-tier path:
1. Structured fast-path — already-shaped input skips LLM
2. Regex path — "need 3 welders in Nashville, TN" parses without LLM.
City/state parser tightened to 1-3 capitalized words + "in {city}"
anchor preference + case-exact full-state-name variants to prevent
"Forklift Operators in Chicago" being captured as the city name
3. LLM fallback — qwen3 local with think:false + 400 max_tokens for
inputs the regex can't handle
Unit tests (tests/multi-agent/normalize.test.ts): 9/9 pass. Covers
structured fast-path, misplacement→rescue intent, state-name→abbrev
conversion, regex extraction from natural language, plural role +
full state name edge case, rescue intent keyword precedence, partial
input reporting missing fields, empty object fallthrough, async/sync
parity on clean inputs.
UNIFIED MEMORY QUERY (tests/multi-agent/memory_query.ts):
One function, five parallel fan-outs, one bundle returned:
- playbook_workers — hybrid_search via gateway with use_playbook_memory
- pathway_recommendation — KB recommender for this sig
- neighbor_signatures — K-NN sigs weighted by staffer competence
- prior_lessons — T3 overseer lessons filtered by city/state
- top_staffers — competence-sorted leaderboard
- discovered_patterns — top workers endorsed across past playbooks
for this (role, city, state)
- latency_ms — per-source + total
Every branch is best-effort: one source down doesn't break the bundle.
HTTP ENDPOINT (mcp-server/index.ts):
POST /memory/query with body {input: <anything>} → MemoryQueryResult
Returns the same shape the TS function does. Typed with types.ts for
future UI consumption.
VERIFIED:
curl POST /memory/query with structured {role,city,state,count}
→ extraction_method=structured, 10 playbook workers, top score 0.878
curl POST /memory/query with "I need 3 welders in Nashville, TN"
→ extraction_method=regex (no LLM call), 319ms total, 8 endorsements
for Lauren Gomez auto-discovered as top Nashville Welder
Honest remaining gaps (documented for next phase):
- Mem0 ADD/UPDATE/DELETE/NOOP — we still only ADD + mark_failed
- Zep validity windows — playbook entries have timestamps but no
retirement semantic
- Letta working-memory / hot cache — every query scans all 1560
playbook entries
- Memory profiles / scoped queries — global pool, no per-staffer
private subsets
2 of 5 findings now shipped (multi-strategy retrieval in Rust, input
normalization + unified query in TS). The remaining 3 are architectural
additions queued as Phase 25 items — validity windows first since it's
the most load-bearing for long-running systems.
240 lines
8.6 KiB
TypeScript
240 lines
8.6 KiB
TypeScript
// Unified memory query — one surface that takes any input, normalizes
|
|
// it, and returns every memory signal the system has: playbook workers,
|
|
// KB pathway recommendations, prior lessons, staffer competence stats,
|
|
// and cross-staffer discovered patterns. This is the "seamlessly with
|
|
// whatever input" answer J framed it as.
|
|
//
|
|
// Why a unified gateway instead of separate calls: the memory surfaces
|
|
// are semantically related (playbook_memory workers reference runs the
|
|
// KB tracks; prior_lessons reference staffers the stats rank). Callers
|
|
// shouldn't have to know the topology.
|
|
|
|
import { readFile, readdir } from "node:fs/promises";
|
|
import { join } from "node:path";
|
|
import { normalizeInput, type NormalizedInput } from "./normalize.ts";
|
|
import { findNeighbors, loadRecommendation, loadStafferStats, computeSignature, type StafferStats, type PathwayRecommendation } from "./kb.ts";
|
|
import { GATEWAY, SIDECAR } from "./agent.ts";
|
|
|
|
export interface MemoryQueryResult {
|
|
input: NormalizedInput;
|
|
playbook_workers: Array<{
|
|
doc_id: string;
|
|
name: string;
|
|
score: number;
|
|
playbook_boost: number;
|
|
playbook_citations: string[];
|
|
}>;
|
|
pathway_recommendation: PathwayRecommendation | null;
|
|
neighbor_signatures: Array<{
|
|
sig_hash: string;
|
|
events_digest: string;
|
|
similarity: number;
|
|
weighted_score: number;
|
|
best_staffer_id: string | null;
|
|
}>;
|
|
prior_lessons: Array<{
|
|
date: string;
|
|
client: string;
|
|
cities: string;
|
|
lesson: string;
|
|
}>;
|
|
top_staffers: StafferStats[];
|
|
discovered_patterns: {
|
|
sig_hash: string;
|
|
top_workers: Array<{ name: string; endorsements: number }>;
|
|
} | null;
|
|
latency_ms: {
|
|
normalize: number;
|
|
playbook_search: number;
|
|
kb_neighbors: number;
|
|
staffer_stats: number;
|
|
prior_lessons: number;
|
|
total: number;
|
|
};
|
|
}
|
|
|
|
// Main entry. Normalizes the input, then fans out to every memory
|
|
// source in parallel. Each source is best-effort: a down dependency
|
|
// just leaves its field empty rather than breaking the query.
|
|
export async function queryMemory(raw: unknown): Promise<MemoryQueryResult> {
|
|
const t0 = Date.now();
|
|
|
|
// Normalize input first — sets the tone for every downstream call.
|
|
const tNorm = Date.now();
|
|
const input = await normalizeInput(raw);
|
|
const normalize_ms = Date.now() - tNorm;
|
|
|
|
// Synthesize a minimal scenario spec from normalized input for sig
|
|
// computation + KB lookup. Only needed if we have at least role + city.
|
|
const pseudoSpec = (input.role && input.city && input.state) ? {
|
|
client: input.client ?? "(unknown)",
|
|
events: [{
|
|
kind: input.intent === "rescue" ? "misplacement" : "baseline_fill",
|
|
role: input.role,
|
|
count: input.count ?? 1,
|
|
city: input.city,
|
|
state: input.state,
|
|
}],
|
|
} : null;
|
|
|
|
// Fire everything that CAN fire in parallel.
|
|
const tPb = Date.now();
|
|
const playbookPromise: Promise<MemoryQueryResult["playbook_workers"]> =
|
|
(input.role && input.city && input.state)
|
|
? fetch(`${GATEWAY}/vectors/hybrid`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
index_name: "workers_500k_v1",
|
|
sql_filter: `role = '${input.role.replace(/'/g, "''")}' AND city = '${input.city.replace(/'/g, "''")}' AND state = '${input.state}' AND CAST(availability AS DOUBLE) > 0.5`,
|
|
question: `${input.role} ${input.city} ${input.state}`,
|
|
top_k: 10,
|
|
generate: false,
|
|
use_playbook_memory: true,
|
|
playbook_memory_k: 100,
|
|
}),
|
|
}).then(r => r.ok ? r.json() : { sources: [] })
|
|
.then((d: any) => (d.sources ?? []).map((s: any) => ({
|
|
doc_id: s.doc_id,
|
|
name: s.chunk_text?.split("—")[0]?.trim() ?? "?",
|
|
score: s.score ?? 0,
|
|
playbook_boost: s.playbook_boost ?? 0,
|
|
playbook_citations: s.playbook_citations ?? [],
|
|
})))
|
|
.catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tKb = Date.now();
|
|
const recPromise = pseudoSpec
|
|
? loadRecommendation(pseudoSpec).catch(() => null)
|
|
: Promise.resolve(null);
|
|
|
|
const neighborsPromise = pseudoSpec
|
|
? findNeighbors(pseudoSpec, 5).catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tLess = Date.now();
|
|
const lessonsPromise: Promise<MemoryQueryResult["prior_lessons"]> =
|
|
(input.city && input.state)
|
|
? loadRelevantLessons(input.city, input.state).catch(() => [])
|
|
: Promise.resolve([]);
|
|
|
|
const tStaff = Date.now();
|
|
const staffersPromise = loadStafferStats().catch(() => []);
|
|
|
|
// Parallel await
|
|
const [playbook_workers, pathway_recommendation, neighbors_raw, prior_lessons, staffers] =
|
|
await Promise.all([playbookPromise, recPromise, neighborsPromise, lessonsPromise, staffersPromise]);
|
|
|
|
// Derived: top-k staffers by competence, from recent activity.
|
|
const top_staffers = staffers
|
|
.filter(s => s.total_runs > 0)
|
|
.sort((a, b) => b.competence_score - a.competence_score)
|
|
.slice(0, 5);
|
|
|
|
// Derived: discovered patterns for this sig (workers endorsed ≥ 2 staffers).
|
|
let discovered_patterns: MemoryQueryResult["discovered_patterns"] = null;
|
|
if (pseudoSpec) {
|
|
const sig_hash = computeSignature(pseudoSpec);
|
|
const workers = await collectTopEndorsedWorkers(sig_hash, input.role, input.city, input.state).catch(() => []);
|
|
if (workers.length > 0) discovered_patterns = { sig_hash, top_workers: workers };
|
|
}
|
|
|
|
return {
|
|
input,
|
|
playbook_workers,
|
|
pathway_recommendation,
|
|
neighbor_signatures: neighbors_raw.map(n => ({
|
|
sig_hash: n.sig.sig_hash,
|
|
events_digest: n.sig.events_digest,
|
|
similarity: n.similarity,
|
|
weighted_score: n.weighted_score,
|
|
best_staffer_id: n.best_staffer_id,
|
|
})),
|
|
prior_lessons,
|
|
top_staffers,
|
|
discovered_patterns,
|
|
latency_ms: {
|
|
normalize: normalize_ms,
|
|
playbook_search: Date.now() - tPb,
|
|
kb_neighbors: Date.now() - tKb,
|
|
staffer_stats: Date.now() - tStaff,
|
|
prior_lessons: Date.now() - tLess,
|
|
total: Date.now() - t0,
|
|
},
|
|
};
|
|
}
|
|
|
|
async function loadRelevantLessons(city: string, state: string | null): Promise<MemoryQueryResult["prior_lessons"]> {
|
|
try {
|
|
const dir = join("data", "_playbook_lessons");
|
|
const files = await readdir(dir).catch(() => [] as string[]);
|
|
const out: MemoryQueryResult["prior_lessons"] = [];
|
|
for (const f of files) {
|
|
if (!f.endsWith(".json")) continue;
|
|
try {
|
|
const raw = await readFile(join(dir, f), "utf8");
|
|
const rec = JSON.parse(raw);
|
|
const lessonCities = (rec.cities ?? "").split(",");
|
|
const lessonStates = (rec.states ?? "").split(",");
|
|
if (lessonCities.includes(city) || (state && lessonStates.includes(state))) {
|
|
out.push({
|
|
date: rec.date,
|
|
client: rec.client,
|
|
cities: rec.cities,
|
|
lesson: rec.lesson.slice(0, 500),
|
|
});
|
|
}
|
|
} catch { /* skip malformed */ }
|
|
}
|
|
out.sort((a, b) => (b.date ?? "").localeCompare(a.date ?? ""));
|
|
return out.slice(0, 3);
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async function collectTopEndorsedWorkers(
|
|
sig_hash: string,
|
|
role: string | null,
|
|
city: string | null,
|
|
state: string | null,
|
|
): Promise<Array<{ name: string; endorsements: number }>> {
|
|
if (!role || !city || !state) return [];
|
|
// Pull from playbook_memory — each successful fill was endorsed;
|
|
// count by (name, role, city, state) across past playbooks.
|
|
try {
|
|
const resp = await fetch(`${GATEWAY}/query/sql`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
sql: `SELECT operation, result FROM successful_playbooks_live
|
|
WHERE operation LIKE 'fill: ${role.replace(/'/g, "''")} %'
|
|
AND operation LIKE '%in ${city.replace(/'/g, "''")}, ${state}%'
|
|
ORDER BY timestamp DESC LIMIT 50`,
|
|
}),
|
|
});
|
|
if (!resp.ok) return [];
|
|
const data: any = await resp.json();
|
|
// successful_playbooks_live.result has the shape
|
|
// "N/N filled → Name A, Name B, Name C"
|
|
// or historical "Name A | Name B | Name C". Handle both.
|
|
const counts = new Map<string, number>();
|
|
for (const row of data.rows ?? []) {
|
|
const raw = String(row.result ?? "");
|
|
const afterArrow = raw.includes("→") ? raw.split("→")[1] : raw;
|
|
const names = afterArrow
|
|
.split(/[,|]/)
|
|
.map((n: string) => n.trim())
|
|
.filter(n => n && !/^\d+\/\d+/.test(n) && !n.toLowerCase().startsWith("filled"));
|
|
for (const n of names) counts.set(n, (counts.get(n) ?? 0) + 1);
|
|
}
|
|
return [...counts.entries()]
|
|
.sort((a, b) => b[1] - a[1])
|
|
.slice(0, 5)
|
|
.map(([name, endorsements]) => ({ name, endorsements }));
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|