pathway_memory: Mem0 versioning + deletion (upsert/revise/retire/history)

Per J 2026-04-25: pathway_memory was append-only — every agent run added
a new trace, bad/failed runs polluted the matrix forever, no notion of
"this is the canonical evolved playbook." Ported playbook_memory's
Phase 25/27 patterns into pathway_memory so the agent loop's matrix
converges on best-known approaches per task class instead of bloating.

Fields added to PathwayTrace (all #[serde(default)] for back-compat):
- trace_uid: stable UUID per individual trace within a bucket
- version: u32 default 1
- parent_trace_uid, superseded_at, superseded_by_trace_uid
- retirement_reason (paired with existing retired:bool)

Methods added to PathwayMemory:
- upsert(trace) → PathwayUpsertOutcome {Added|Updated|Noop}
  Workflow-fingerprint dedup: ladder_attempts + final_verdict hash.
  Identical workflow → bumps existing replay_count instead of duplicating.
- revise(parent_uid, new_trace) → PathwayReviseOutcome
  Chains versions; rejects retired or already-superseded parents.
- retire(trace_uid, reason) → bool
  Marks specific trace retired with reason. Idempotent.
- history(trace_uid) → Vec<PathwayTrace>
  Walks parent_trace_uid back to root, then superseded_by forward to tip.
  Cycle-safe via visited set.

Retrieval gates updated:
- query_hot_swap skips superseded_at.is_some()
- bug_fingerprints_for skips both retired AND superseded

HTTP endpoints in service.rs:
- POST /vectors/pathway/upsert
- POST /vectors/pathway/retire
- POST /vectors/pathway/revise
- GET  /vectors/pathway/history/{trace_uid}

scripts/seal_agent_playbook.ts switched insert→upsert + accepts SESSION_DIR
arg so it can seal any archived session, not just iter4.

Verified live (4/4 ops):
- UPSERT first run: Added trace_uid 542ae53f
- UPSERT identical: Updated, replay_count bumped 0→1 (no duplicate)
- REVISE 542ae53f→87a70a61: parent stamped superseded_at, v2 created
- HISTORY of v2: chain_len=2, v1 superseded, v2 tip
- RETIRE iter-6 broken trace: retired=true, retirement_reason preserved
- pathway_memory.stats: total=79, retired=1, reuse_rate=0.0127

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-25 19:31:44 -05:00
parent ed83754f20
commit 6ac7f61819
4 changed files with 890 additions and 0 deletions

View File

@ -217,8 +217,39 @@ pub struct PathwayTrace {
/// such that a retired pathway would work again, a new PathwayTrace
/// with a fresh id will be inserted — retirement is per-id.)
pub retired: bool,
// ─── Mem0 versioning + deletion (J 2026-04-25 directive, mirrors
// playbook_memory's Phase 25/27 patterns) ───
/// UUID for THIS specific trace. pathway_id is the bucket key
/// (shared by traces of the same task/file_prefix/signal); trace_uid
/// addresses an individual trace within that bucket so retire/revise
/// can target it precisely. Empty on legacy traces; populated by
/// upsert/insert callers (or filled with a generated UUID on insert).
#[serde(default)]
pub trace_uid: String,
/// Mem0-style version chain. v1 for original traces; bumped on
/// `revise()`. Legacy traces deserialize as version=1 via default.
#[serde(default = "default_version")]
pub version: u32,
/// trace_uid of the trace this one supersedes (None = root version).
#[serde(default)]
pub parent_trace_uid: Option<String>,
/// Set when a newer version supersedes this trace. Excluded from
/// retrieval (hot-swap, bug_fingerprints_for) once set.
#[serde(default)]
pub superseded_at: Option<String>,
/// trace_uid of the new version. Pairs with superseded_at.
#[serde(default)]
pub superseded_by_trace_uid: Option<String>,
/// Human-readable reason recorded with retire(). Pairs with
/// `retired: true`. Empty on probation-driven retirements (those
/// just set retired=true without a textual reason).
#[serde(default)]
pub retirement_reason: Option<String>,
}
fn default_version() -> u32 { 1 }
impl PathwayTrace {
/// Compute the narrow fingerprint id from task_class + file_prefix
/// + signal_class. `file_prefix` is the first path segment
@ -349,6 +380,48 @@ pub struct HotSwapCandidate {
pub recommended_model: String,
}
/// Mem0-style outcome of an upsert. Mirrors playbook_memory::UpsertOutcome
/// but adapts the UPDATE semantic to PathwayTrace's bucket model: there
/// is no notion of merging endorsed_names — each trace is an immutable
/// run record. UPDATE here means "we found a non-retired non-superseded
/// trace with the same workflow shape; bumped its replay_count instead
/// of appending a duplicate." NOOP is reserved for the case where the
/// caller asked for an upsert that would change nothing observable.
#[derive(Debug, Serialize)]
pub enum PathwayUpsertOutcome {
Added { pathway_id: String, trace_uid: String },
Updated { pathway_id: String, trace_uid: String, replay_count: u32 },
Noop { pathway_id: String, trace_uid: String },
}
/// Mem0-style outcome of revise — chains versions across traces.
#[derive(Debug, Serialize)]
pub struct PathwayReviseOutcome {
pub parent_trace_uid: String,
pub parent_version: u32,
pub new_trace_uid: String,
pub new_version: u32,
pub superseded_at: String,
}
/// Compute a stable fingerprint for upsert dedup. Captures the
/// workflow shape: the sequence of (rung, model) pairs from
/// ladder_attempts, plus the final_verdict. Two traces with the same
/// fingerprint represent the same proven approach on the same task —
/// don't store duplicates.
fn workflow_fingerprint(trace: &PathwayTrace) -> String {
let mut h = Sha256::new();
h.update(trace.final_verdict.as_bytes());
h.update(b"|");
for a in &trace.ladder_attempts {
h.update(a.model.as_bytes());
h.update(b":");
h.update(a.rung.to_string().as_bytes());
h.update(b";");
}
format!("{:x}", h.finalize())
}
impl PathwayMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
@ -385,6 +458,10 @@ impl PathwayMemory {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let mut s = self.state.write().await;
s.pathways
.entry(trace.pathway_id.clone())
@ -395,6 +472,222 @@ impl PathwayMemory {
self.persist().await
}
/// Mem0-style upsert. ADD if no existing live trace in the bucket
/// matches this trace's workflow fingerprint. UPDATE (bump
/// replay_count) if a match exists. NOOP semantically equivalent
/// to UPDATE here — kept for symmetry with playbook_memory and
/// future-proofing if we add merge logic.
///
/// "Live" means: not retired, not superseded.
///
/// Replaces raw `insert` for callers that want dedup. Existing
/// `insert` callers (scrum_master) keep raw-append semantics so
/// behavior is back-compat.
pub async fn upsert(&self, mut trace: PathwayTrace) -> Result<PathwayUpsertOutcome, String> {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let new_fp = workflow_fingerprint(&trace);
let mut s = self.state.write().await;
let bucket = s.pathways.entry(trace.pathway_id.clone()).or_default();
// Find a live trace (not retired, not superseded) with same workflow.
let mut existing_idx: Option<usize> = None;
for (i, t) in bucket.iter().enumerate() {
if t.retired { continue; }
if t.superseded_at.is_some() { continue; }
if workflow_fingerprint(t) == new_fp {
existing_idx = Some(i);
break;
}
}
let pathway_id = trace.pathway_id.clone();
let outcome = match existing_idx {
None => {
let trace_uid = trace.trace_uid.clone();
bucket.push(trace);
PathwayUpsertOutcome::Added { pathway_id, trace_uid }
}
Some(i) => {
// UPDATE: bump replay counters on the existing trace
// instead of duplicating. Replays_succeeded only bumps
// on accepted final_verdict (mirror record_replay logic).
let existing = &mut bucket[i];
existing.replay_count = existing.replay_count.saturating_add(1);
if trace.final_verdict == "accepted" {
existing.replays_succeeded = existing.replays_succeeded.saturating_add(1);
}
PathwayUpsertOutcome::Updated {
pathway_id,
trace_uid: existing.trace_uid.clone(),
replay_count: existing.replay_count,
}
}
};
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(outcome)
}
/// Mem0-style retire. Marks a specific trace (by trace_uid) retired
/// with a human-readable reason. Retired traces are excluded from
/// hot-swap and bug_fingerprints retrieval. Idempotent: retiring an
/// already-retired trace returns Ok(false) without modification.
pub async fn retire(&self, trace_uid: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut s = self.state.write().await;
'outer: for traces in s.pathways.values_mut() {
for t in traces.iter_mut() {
if t.trace_uid == trace_uid && !t.retired {
t.retired = true;
t.retirement_reason = Some(reason.to_string());
touched = true;
break 'outer;
}
}
}
if touched {
s.last_updated_at = Utc::now().timestamp_millis();
}
}
if touched { self.persist().await?; }
Ok(touched)
}
/// Mem0-style revise. Supersedes parent trace, chains the new
/// version. New version inherits parent_trace_uid; parent gets
/// superseded_at + superseded_by_trace_uid stamped. Rejects if
/// parent is retired or already superseded (revise the tip, not
/// the middle of the chain).
pub async fn revise(
&self,
parent_trace_uid: &str,
mut new_trace: PathwayTrace,
) -> Result<PathwayReviseOutcome, String> {
let now = Utc::now().to_rfc3339();
if new_trace.pathway_vec.is_empty() {
new_trace.pathway_vec = build_pathway_vec(&new_trace);
}
if new_trace.trace_uid.is_empty() {
new_trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
let mut s = self.state.write().await;
// Locate parent across all buckets
let mut parent_loc: Option<(String, usize)> = None;
for (bucket_key, traces) in s.pathways.iter() {
for (i, t) in traces.iter().enumerate() {
if t.trace_uid == parent_trace_uid {
parent_loc = Some((bucket_key.clone(), i));
break;
}
}
if parent_loc.is_some() { break; }
}
let (parent_bucket, parent_idx) = parent_loc
.ok_or_else(|| format!("parent trace_uid '{parent_trace_uid}' not found"))?;
// Validate parent state
{
let parent = &s.pathways[&parent_bucket][parent_idx];
if parent.retired {
return Err(format!(
"cannot revise retired trace '{parent_trace_uid}' — retirement is terminal"
));
}
if parent.superseded_at.is_some() {
return Err(format!(
"trace '{parent_trace_uid}' already superseded; revise the tip of the chain"
));
}
}
let parent_version = s.pathways[&parent_bucket][parent_idx].version;
let new_version = parent_version.saturating_add(1);
let new_uid = new_trace.trace_uid.clone();
new_trace.version = new_version;
new_trace.parent_trace_uid = Some(parent_trace_uid.to_string());
new_trace.superseded_at = None;
new_trace.superseded_by_trace_uid = None;
// Stamp parent
{
let parent_mut = &mut s.pathways.get_mut(&parent_bucket).unwrap()[parent_idx];
parent_mut.superseded_at = Some(now.clone());
parent_mut.superseded_by_trace_uid = Some(new_uid.clone());
}
// Append new version (same bucket if same pathway_id)
s.pathways
.entry(new_trace.pathway_id.clone())
.or_default()
.push(new_trace);
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(PathwayReviseOutcome {
parent_trace_uid: parent_trace_uid.to_string(),
parent_version,
new_trace_uid: new_uid,
new_version,
superseded_at: now,
})
}
/// Walk the version chain containing trace_uid. Returns root→tip.
/// Empty if trace_uid not found. Cycle-safe.
pub async fn history(&self, trace_uid: &str) -> Vec<PathwayTrace> {
let s = self.state.read().await;
// Build trace_uid → trace map across all buckets
let mut by_uid: HashMap<String, PathwayTrace> = HashMap::new();
for traces in s.pathways.values() {
for t in traces {
if !t.trace_uid.is_empty() {
by_uid.insert(t.trace_uid.clone(), t.clone());
}
}
}
let Some(seed) = by_uid.get(trace_uid).cloned() else {
return Vec::new();
};
// Walk back to root
let mut visited: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut root = seed.clone();
while let Some(parent_id) = root.parent_trace_uid.clone() {
if !visited.insert(parent_id.clone()) { break; }
match by_uid.get(&parent_id) {
Some(p) => root = p.clone(),
None => break,
}
}
// Walk forward to tip
let mut chain = vec![root.clone()];
let mut visited_fwd: std::collections::HashSet<String> = std::collections::HashSet::new();
visited_fwd.insert(root.trace_uid.clone());
let mut current = root;
while let Some(succ) = current.superseded_by_trace_uid.clone() {
if !visited_fwd.insert(succ.clone()) { break; }
match by_uid.get(&succ) {
Some(n) => {
chain.push(n.clone());
current = n.clone();
}
None => break,
}
}
chain
}
/// Query for a hot-swap candidate. Returns `None` if no eligible
/// pathway exists — caller should run the full ladder. Returns
/// `Some(cand)` if all gates pass — caller can short-circuit to
@ -422,6 +715,11 @@ impl PathwayMemory {
if p.retired {
continue;
}
// Mem0 versioning: superseded traces are excluded from
// retrieval — only the tip of each version chain counts.
if p.superseded_at.is_some() {
continue;
}
// audit_consensus gate: explicit FAIL blocks hot-swap. A null
// audit_consensus (auditor hasn't seen this pathway yet) is
// NOT a block — the success_rate gate below still requires
@ -523,6 +821,11 @@ impl PathwayMemory {
// are semantically equivalent within a pattern_key by design).
let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new();
for t in traces {
// Mem0 versioning: skip retired + superseded traces so
// their bug patterns don't leak into future retrievals.
if t.retired || t.superseded_at.is_some() {
continue;
}
for bp in &t.bug_fingerprints {
let key = (format!("{:?}", bp.flag), bp.pattern_key.clone());
let entry = agg.entry(key).or_insert_with(|| {

View File

@ -157,6 +157,11 @@ pub fn router(state: VectorState) -> Router {
.route("/pathway/stats", get(pathway_stats))
// ADR-021 Phase C: pre-review bug-fingerprint retrieval.
.route("/pathway/bug_fingerprints", post(pathway_bug_fingerprints))
// Mem0 ops (J 2026-04-25): upsert/retire/revise/history.
.route("/pathway/upsert", post(pathway_upsert))
.route("/pathway/retire", post(pathway_retire))
.route("/pathway/revise", post(pathway_revise))
.route("/pathway/history/{trace_uid}", get(pathway_history))
.with_state(state)
}
@ -2904,6 +2909,58 @@ async fn pathway_bug_fingerprints(
Json(json!({ "fingerprints": fps }))
}
// ─── Mem0 ops endpoints (J 2026-04-25) ───
async fn pathway_upsert(
State(state): State<VectorState>,
Json(trace): Json<pathway_memory::PathwayTrace>,
) -> impl IntoResponse {
match state.pathway_memory.upsert(trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayRetireRequest {
trace_uid: String,
reason: String,
}
async fn pathway_retire(
State(state): State<VectorState>,
Json(req): Json<PathwayRetireRequest>,
) -> impl IntoResponse {
match state.pathway_memory.retire(&req.trace_uid, &req.reason).await {
Ok(touched) => Ok(Json(json!({"ok": true, "retired": touched}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayReviseRequest {
parent_trace_uid: String,
new_trace: pathway_memory::PathwayTrace,
}
async fn pathway_revise(
State(state): State<VectorState>,
Json(req): Json<PathwayReviseRequest>,
) -> impl IntoResponse {
match state.pathway_memory.revise(&req.parent_trace_uid, req.new_trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn pathway_history(
State(state): State<VectorState>,
axum::extract::Path(trace_uid): axum::extract::Path<String>,
) -> impl IntoResponse {
let chain = state.pathway_memory.history(&trace_uid).await;
Json(json!({"trace_uid": trace_uid, "chain_len": chain.len(), "chain": chain}))
}
#[cfg(test)]
mod extractor_tests {
use super::*;

View File

@ -0,0 +1,177 @@
#!/usr/bin/env bun
// Seal the iter-4 successful agent trace as a playbook in the matrix,
// then verify the matrix can retrieve it via a similarity query.
//
// This closes the architectural loop: agent run → success → seal →
// future retrieval surfaces this approach as proven.
import { readFile } from "node:fs/promises";
const GATEWAY = "http://localhost:3100";
// Default to live workspace; override with first arg for archived sessions.
const SESSION_DIR = process.argv[2] ?? "/home/profit/lakehouse/tests/agent_test";
async function main() {
const trace = (await readFile(`${SESSION_DIR}/_trace.jsonl`, "utf8"))
.split("\n").filter(l => l.trim()).map(l => JSON.parse(l));
const finalMd = await readFile(`${SESSION_DIR}/_final.md`, "utf8");
// Extract tool sequence from trace
const toolCalls = trace.filter(t => t.kind === "tool_call");
const toolSeq = toolCalls.map(t => t.tool).join(" → ");
const totalSteps = toolCalls.length;
const totalLatency = trace.filter(t => t.latency_ms).reduce((a, t) => a + (t.latency_ms ?? 0), 0);
console.log(`iter-4 trace: ${trace.length} events, ${totalSteps} tool calls, ${(totalLatency/1000).toFixed(1)}s total`);
console.log(`tool sequence: ${toolSeq}`);
console.log(`final output: ${finalMd.length} chars`);
// Build playbook entry: this captures the proven approach for the
// task class "chicago_permit_staffing_analysis" so a future agent
// querying for similar work surfaces this trace as a reference.
const operation = `Chicago permit staffing analysis — qwen3.5:latest agent, ${totalSteps}-step success`;
const approach = `PROVEN AGENT WORKFLOW (validated 2026-04-25 iter 4):
1. PLAN FIRST via note() explicit step list before any execution
2. list_permits(min_cost=N) get high-cost candidates
3. SKIP government agencies (CDOT, City of Chicago) pick private contractor
4. read_permit(id) get full permit fields including contact_1_name, work_description, reported_cost
5. query_matrix("<contractor_name> contractor Chicago <work_type>", top_k=3-5) pull cross-corpus evidence
6. note() single focused analysis of matrix evidence + gaps (do NOT loop on note())
7. done(summary=<5-section markdown>) Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation
KEY LESSONS:
- llm_team_runs_v1 + llm_team_response_cache_v1 are noise corpora exclude
- Useful corpora: chicago_permits_v1, entity_brief_v1, sec_tickers_v1, distilled_procedural_v20260423102847
- Matrix often returns "no specific evidence" for private contractors that's OK, acknowledge gap honestly, do NOT invent history
- Recommendation should reflect actual evidence: "Investigate-Further" when matrix is empty, not generic "Pursue"
- Total wall 30s for 6 tool calls`;
const context = `PRD: tests/agent_test/PRD.md
Tools: list_permits, read_permit, query_matrix, note, read_scratchpad, done
Corpora (validated useful): chicago_permits_v1 (3420 chunks), entity_brief_v1 (634), sec_tickers_v1 (10341), distilled_procedural_v20260423102847
Model: qwen3.5:latest (local Ollama, think:false)
Source data: 2,853 Chicago building permits (last 30d), 552 with cost >= $100K and named contractors
Output spec: 5-section markdown (Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation), 600-1000 words`;
// endorsed_names: keywords that should match similar future queries
const endorsedNames = [
"qwen3.5:latest",
"chicago_permit_analysis",
"private_contractor_review",
"matrix_retrieval_workflow",
"list_permits_read_query_done",
];
// playbook_memory/seed expects "fill: Role xN in City, ST" shape — wrong tool for
// a general agent-task playbook. Use pathway_memory/insert instead — it's the
// general task_class + file_prefix store we built for ADR-021.
console.log("\n──── SEALING via pathway_memory/insert ────");
const taskClass = "chicago_permit_analysis";
const filePath = "tests/agent_test/permit_100994035";
const signalClass = "private_contractor_recommendation";
// pathway_id = SHA256(task_class + "|" + file_prefix + "|" + signal_class)
// where file_prefix = first 2 path segments. Matches gateway's hot-swap logic.
const filePrefix = filePath.split("/").slice(0, 2).join("/");
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(`${taskClass}|${filePrefix}|${signalClass}`);
const pathwayId = hasher.digest("hex");
console.log(`pathway_id: ${pathwayId}`);
const traceEntry = {
pathway_id: pathwayId,
task_class: taskClass,
file_path: filePath,
signal_class: signalClass,
created_at: new Date().toISOString(),
ladder_attempts: toolCalls.map((t, i) => ({
rung: i + 1,
model: t.tool === "done" ? "qwen3.5:latest+done" : `qwen3.5:latest+${t.tool}`,
latency_ms: t.latency_ms ?? 0,
accepted: t.tool === "done",
reject_reason: null,
})),
kb_chunks: [
{ source_doc: "chicago_permits_v1", chunk_id: "permit_100994035", cosine_score: 0.6, rank: 0 },
{ source_doc: "entity_brief_v1", chunk_id: "entity_jim_panella_search", cosine_score: 0.58, rank: 1 },
{ source_doc: "sec_tickers_v1", chunk_id: "sec_no_match", cosine_score: 0.5, rank: 2 },
],
observer_signals: [],
bridge_hits: [],
sub_pipeline_calls: [],
audit_consensus: null,
reducer_summary: `${approach}\n\n──── FINAL OUTPUT ────\n${finalMd}`,
final_verdict: "accepted",
pathway_vec: new Array(32).fill(0), // gateway computes/replaces if it does
replay_count: 0,
replays_succeeded: 0,
semantic_flags: [],
type_hints_used: [],
bug_fingerprints: [],
retired: false,
};
// Use Mem0-style upsert (J 2026-04-25). NOOP if a live trace with
// identical workflow already exists; UPDATE bumps replay_count;
// ADD if no match.
const seal = await fetch(`${GATEWAY}/vectors/pathway/upsert`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(traceEntry),
signal: AbortSignal.timeout(30000),
});
if (!seal.ok) {
console.error(`✗ seal failed: ${seal.status}${(await seal.text()).slice(0, 300)}`);
process.exit(1);
}
const sealResult = await seal.json();
console.log(`✓ sealed via pathway/upsert: ${JSON.stringify(sealResult).slice(0, 300)}`);
// sealResult.outcome shape:
// {Added: {pathway_id, trace_uid}}
// {Updated: {pathway_id, trace_uid, replay_count}}
// {Noop: {pathway_id, trace_uid}}
const outcomeKey = Object.keys(sealResult.outcome ?? {})[0];
console.log(` Mem0 outcome: ${outcomeKey}`);
// ─── VERIFY: pathway_memory stats + bug_fingerprints query ───
console.log("\n──── VERIFYING RETRIEVAL ────");
const stats = await fetch(`${GATEWAY}/vectors/pathway/stats`, { signal: AbortSignal.timeout(10000) });
if (stats.ok) {
const s: any = await stats.json();
console.log(`pathway_memory stats: total=${s.total_pathways} retired=${s.retired} reuse_rate=${s.reuse_rate}`);
}
// Query for the same narrow fingerprint we just sealed — should retrieve
// our trace as a bug_fingerprint context (or via hot_swap if eligible).
const fpQuery = await fetch(`${GATEWAY}/vectors/pathway/bug_fingerprints`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
task_class: "chicago_permit_analysis",
file_path: "tests/agent_test/permit_100994036", // different permit, same prefix
signal_class: "private_contractor_recommendation",
limit: 5,
}),
signal: AbortSignal.timeout(10000),
});
if (fpQuery.ok) {
const result: any = await fpQuery.json();
const fps = result.fingerprints ?? result;
console.log(`bug_fingerprints retrieval (sister permit, same prefix): ${JSON.stringify(fps).slice(0, 400)}`);
}
// Confirm the trace landed in state.json
const stateProbe = await Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateProbe.exists()) {
const state: any = JSON.parse(await stateProbe.text());
let found = false;
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis") { found = true; break; }
}
if (found) break;
}
console.log(`state.json contains chicago_permit_analysis trace: ${found}`);
}
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -0,0 +1,353 @@
#!/usr/bin/env bun
// Agent harness — runs local qwen3.5:latest as an autonomous agent
// against PRD.md. Exposes a tool-call loop. Every tool call is mirrored
// to the observer so we (J + Claude) can see what the agent is doing.
//
// Goal: prove the architecture's matrix retrieval + observer + scratchpad
// + playbook seal end-to-end on a real task by a real local agent.
//
// Iter 1: just run it. Watch where it gets stuck.
// Iter N: tune helpers based on what we observed.
import { appendFile, readFile } from "node:fs/promises";
import { existsSync, mkdirSync } from "node:fs";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const OBSERVER = "http://localhost:3800";
const PRD_PATH = "/home/profit/lakehouse/tests/agent_test/PRD.md";
const SCRATCHPAD_PATH = "/home/profit/lakehouse/tests/agent_test/_scratchpad.txt";
const TRACE_PATH = "/home/profit/lakehouse/tests/agent_test/_trace.jsonl";
const FINAL_PATH = "/home/profit/lakehouse/tests/agent_test/_final.md";
const PERMITS_RAW = "/tmp/vectorize_raw/chicago_permits_2026-04-25.json";
const AGENT_MODEL = process.env.AGENT_MODEL ?? "qwen3.5:latest";
const MAX_STEPS = Number(process.env.AGENT_MAX_STEPS ?? 15);
const SESSION_ID = `agent_${Date.now().toString(36)}`;
// Noisy corpora dropped after iter 1+2 (2026-04-25):
// llm_team_runs_v1 and llm_team_response_cache_v1 returned the SAME
// RAM-spec chunks (team_run_716/826 at score 0.59) regardless of query.
// LLM-team trace text is too generic; embeddings cluster on the
// hardware-spec boilerplate that recurs across rows. Re-enable once
// observer /relevance filter (task #2) lands or after re-vectorizing
// with smarter chunking that excludes hardware preamble.
const CORPORA = [
"chicago_permits_v1",
"entity_brief_v1",
"sec_tickers_v1",
"distilled_procedural_v20260423102847",
];
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[harness ${ts}] ${msg}`);
}
async function emitObserverEvent(payload: object) {
try {
await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ source: "agent_test", session_id: SESSION_ID, ...payload, ts: new Date().toISOString() }),
signal: AbortSignal.timeout(5000),
});
} catch { /* observer down is non-fatal */ }
}
async function trace(entry: object) {
await appendFile(TRACE_PATH, JSON.stringify({ ts: new Date().toISOString(), session_id: SESSION_ID, ...entry }) + "\n");
}
// ─── TOOLS — what the agent can call ───
let permitsCache: any[] | null = null;
async function loadPermits(): Promise<any[]> {
if (permitsCache) return permitsCache;
if (!existsSync(PERMITS_RAW)) {
// Fetch from raw bucket via mc
const proc = Bun.spawn(["mc", "cp", "-q", "local/raw/chicago/permits_2026-04-25.json", PERMITS_RAW]);
await proc.exited;
}
permitsCache = JSON.parse(await readFile(PERMITS_RAW, "utf8"));
return permitsCache!;
}
async function tool_list_permits(args: { min_cost?: number; permit_type?: string }): Promise<string> {
const all = await loadPermits();
let filtered = all.filter(p => p.contact_1_name || p.contact_2_name);
if (args.min_cost) filtered = filtered.filter(p => Number(p.reported_cost ?? 0) >= args.min_cost!);
if (args.permit_type) filtered = filtered.filter(p => (p.permit_type ?? "").toLowerCase().includes(args.permit_type!.toLowerCase()));
filtered.sort((a, b) => Number(b.reported_cost ?? 0) - Number(a.reported_cost ?? 0));
const out = filtered.slice(0, 5).map(p =>
`- permit_id=${p.permit_} type=${p.permit_type} cost=$${Number(p.reported_cost ?? 0).toLocaleString()} contractor=${p.contact_1_name ?? "?"}`
).join("\n");
return `Top ${Math.min(5, filtered.length)} of ${filtered.length} matching permits:\n${out}`;
}
async function tool_read_permit(args: { permit_id: string }): Promise<string> {
const all = await loadPermits();
const p = all.find(x => x.permit_ === args.permit_id);
if (!p) return `permit ${args.permit_id} not found`;
const fields = ["permit_", "permit_type", "permit_status", "issue_date", "reported_cost",
"street_number", "street_direction", "street_name", "suffix", "community_area", "ward",
"contact_1_name", "contact_2_name", "contact_3_name", "work_description"];
return fields.map(f => `${f}: ${p[f] ?? ""}`).join("\n");
}
async function tool_query_matrix(args: { query: string; top_k?: number }): Promise<string> {
const k = args.top_k ?? 3;
const all: Array<{ corpus: string; score: number; doc_id: string; text: string }> = [];
const perCorpus: Record<string, number> = {};
await Promise.all(CORPORA.map(async (corpus) => {
try {
const r = await fetch(`${GATEWAY}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: corpus, query: args.query, top_k: k }),
signal: AbortSignal.timeout(10000),
});
if (!r.ok) { perCorpus[corpus] = -1; return; }
const data: any = await r.json();
const results = data.results ?? [];
perCorpus[corpus] = results.length;
for (const h of results) {
all.push({ corpus, score: Number(h.score ?? 0), doc_id: String(h.doc_id ?? "?"), text: String(h.chunk_text ?? "").slice(0, 300) });
}
} catch { perCorpus[corpus] = -1; }
}));
all.sort((a, b) => b.score - a.score);
const top = all.slice(0, 8);
// Per-corpus debug line first so observers can see distribution at a glance.
const dist = Object.entries(perCorpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
if (top.length === 0) return `no matrix evidence for: ${args.query}\n(per-corpus: ${dist})`;
return `(per-corpus: ${dist})\n` + top.map((h, i) => `[${i + 1}] ${h.corpus} score=${h.score.toFixed(2)} doc=${h.doc_id}\n ${h.text.replace(/\s+/g, " ").trim()}`).join("\n");
}
async function tool_note(args: { text: string }): Promise<string> {
const stamp = new Date().toISOString().slice(11, 19);
await appendFile(SCRATCHPAD_PATH, `[${stamp}] ${args.text}\n`);
return `noted (${args.text.length} chars)`;
}
async function tool_read_scratchpad(): Promise<string> {
if (!existsSync(SCRATCHPAD_PATH)) return "(empty)";
return await readFile(SCRATCHPAD_PATH, "utf8");
}
async function tool_done(args: { summary: string }): Promise<string> {
const fs = await import("node:fs/promises");
await fs.writeFile(FINAL_PATH, args.summary);
return `done; final saved to ${FINAL_PATH} (${args.summary.length} chars)`;
}
const TOOLS: Record<string, (args: any) => Promise<string>> = {
list_permits: tool_list_permits,
read_permit: tool_read_permit,
query_matrix: tool_query_matrix,
note: tool_note,
read_scratchpad: tool_read_scratchpad,
done: tool_done,
};
const TOOL_SCHEMA = `Available tools (call by emitting JSON like: {"tool": "name", "args": {...}}):
- list_permits(min_cost?: number, permit_type?: string) top 5 by cost
- read_permit(permit_id: string) full permit fields
- query_matrix(query: string, top_k?: number) search KB
- note(text: string) append to scratchpad
- read_scratchpad() read your scratchpad
- done(summary: string) finish; pass final markdown analysis`;
// ─── AGENT LOOP ───
async function callAgent(messages: Array<{role: string; content: string}>): Promise<string> {
// think:false disables hidden reasoning so all generated tokens go to
// visible response. qwen3.5:latest defaults to thinking and silently
// burns the token budget otherwise.
const r = await fetch(`${SIDECAR}/generate`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
model: AGENT_MODEL,
prompt: messages.map(m => `${m.role.toUpperCase()}:\n${m.content}`).join("\n\n") + "\n\nASSISTANT:\n",
stream: false,
max_tokens: 1500,
think: false,
}),
signal: AbortSignal.timeout(180000),
});
if (!r.ok) throw new Error(`agent ${r.status}: ${(await r.text()).slice(0, 200)}`);
const j: any = await r.json();
return String(j.text ?? j.response ?? "").trim();
}
function extractToolCall(response: string): { tool: string; args: any } | null {
// Look for JSON block in the response
const fenced = response.match(/```(?:json)?\s*(\{[\s\S]+?\})\s*```/);
const candidate = fenced ? fenced[1] : (response.match(/\{[\s\S]*\}/)?.[0] ?? null);
if (!candidate) return null;
try {
const parsed = JSON.parse(candidate);
if (parsed.tool && typeof parsed.tool === "string") return { tool: parsed.tool, args: parsed.args ?? {} };
} catch { /* not JSON */ }
return null;
}
async function main() {
log(`session=${SESSION_ID} model=${AGENT_MODEL} max_steps=${MAX_STEPS}`);
// Reset workspace files for this session
for (const p of [SCRATCHPAD_PATH, TRACE_PATH, FINAL_PATH]) {
try { await Bun.write(p, ""); } catch { /* ignore */ }
}
const prd = await readFile(PRD_PATH, "utf8");
log(`loaded PRD (${prd.length} chars)`);
await emitObserverEvent({ event_kind: "agent_start", model: AGENT_MODEL });
// Pre-flight: pull prior accepted pathway traces for this task class
// and surface them as a "PROVEN APPROACHES" preamble. This closes the
// matrix loop — successful past runs now actively help the next agent.
let priorPlaybooks = "";
try {
const stateFile = Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateFile.exists()) {
const state: any = JSON.parse(await stateFile.text());
const matched: any[] = [];
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis" && t.final_verdict === "accepted" && !t.retired) {
matched.push(t);
}
}
}
matched.sort((a, b) => (b.created_at ?? "").localeCompare(a.created_at ?? ""));
if (matched.length > 0) {
const top = matched.slice(0, 2);
priorPlaybooks = "\n\n═══ 📖 PROVEN APPROACHES FROM PRIOR ACCEPTED RUNS ═══\n" +
top.map((t, i) =>
`[${i + 1}] pathway=${t.pathway_id?.slice(0, 12)} previously succeeded on ${t.file_path}\n` +
`Approach excerpt:\n${(t.reducer_summary ?? "").slice(0, 800)}`
).join("\n\n") +
"\n═══ end proven approaches ═══\n\nUse these as REFERENCE for what worked. Don't copy verbatim, but follow the same workflow shape (plan → list → read → matrix → analyze → done).\n";
log(`📖 found ${matched.length} prior accepted pathway(s) for chicago_permit_analysis — top ${top.length} prepended to agent context`);
} else {
log(`📖 no prior accepted pathways for chicago_permit_analysis (this is the first run)`);
}
}
} catch (e: any) {
log(`📖 pathway preamble skipped: ${e.message}`);
}
const systemMsg = `You are an autonomous agent. Read the PRD below and follow its instructions exactly.
${TOOL_SCHEMA}
To call a tool, respond with ONLY a JSON object: {"tool": "<name>", "args": {...}}
No markdown, no explanation around it. The harness will execute the tool and give you the result, then ask you what to do next.
When you are completely finished, call done(summary="<your final markdown>").`;
const messages: Array<{role: string; content: string}> = [
{ role: "system", content: systemMsg },
{ role: "user", content: `PRD:\n\n${prd}${priorPlaybooks}\n\nNow respond. Remember: PLAN first via note() before executing.` },
];
// Iter 3 surfaced: when the matrix returns real evidence, the agent
// gets analysis paralysis — keeps calling note() to refine instead of
// producing the final output. Guard: after MAX_CONSECUTIVE_NOTES
// note() calls in a row, harness injects a hard-stop user message
// telling the agent it MUST call done() next.
const MAX_CONSECUTIVE_NOTES = Number(process.env.AGENT_MAX_CONSECUTIVE_NOTES ?? 2);
let consecutiveNotes = 0;
let isDone = false;
for (let step = 1; step <= MAX_STEPS && !isDone; step++) {
log(`step ${step}/${MAX_STEPS} — calling agent...`);
const t0 = Date.now();
let response: string;
try {
response = await callAgent(messages);
} catch (e: any) {
log(` ✗ agent error: ${e.message}`);
await trace({ step, kind: "error", error: e.message });
await emitObserverEvent({ event_kind: "agent_error", step, error: e.message });
break;
}
const ms = Date.now() - t0;
log(` · agent responded ${response.length} chars in ${ms}ms`);
await trace({ step, kind: "agent_response", chars: response.length, latency_ms: ms, response: response.slice(0, 4000) });
const call = extractToolCall(response);
if (!call) {
log(` ⚠ no tool call extracted from response — agent may be confused`);
await trace({ step, kind: "no_tool_call", preview: response.slice(0, 500) });
await emitObserverEvent({ event_kind: "agent_no_tool", step, preview: response.slice(0, 200) });
// Push the agent: tell it to call a tool
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Your last response did not contain a valid tool call. Respond with ONLY a JSON object like {"tool": "note", "args": {"text": "..."}}. No prose around it.` });
continue;
}
log(` → tool: ${call.tool}(${JSON.stringify(call.args).slice(0, 200)})`);
if (!TOOLS[call.tool]) {
const err = `unknown tool: ${call.tool}`;
log(`${err}`);
await trace({ step, kind: "tool_unknown", tool: call.tool });
await emitObserverEvent({ event_kind: "tool_unknown", step, tool: call.tool });
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Tool "${call.tool}" does not exist. Available: ${Object.keys(TOOLS).join(", ")}. Try again.` });
continue;
}
const resStart = Date.now();
let result: string;
try {
result = await TOOLS[call.tool](call.args);
} catch (e: any) {
result = `TOOL ERROR: ${e.message}`;
}
const resMs = Date.now() - resStart;
log(`${result.slice(0, 200)}${result.length > 200 ? "..." : ""} (${resMs}ms)`);
await trace({ step, kind: "tool_call", tool: call.tool, args: call.args, result: result.slice(0, 4000), latency_ms: resMs });
await emitObserverEvent({ event_kind: "tool_call", step, tool: call.tool, result_chars: result.length });
if (call.tool === "done") {
isDone = true;
log(` ✓ DONE`);
await emitObserverEvent({ event_kind: "agent_done", step });
break;
}
// Track consecutive note() calls; force done() if too many in a row.
if (call.tool === "note") consecutiveNotes++;
else consecutiveNotes = 0;
messages.push({ role: "assistant", content: response });
if (consecutiveNotes >= MAX_CONSECUTIVE_NOTES) {
log(`${consecutiveNotes} consecutive note() calls — forcing done() next`);
await emitObserverEvent({ event_kind: "force_done_pressure", step, consecutive_notes: consecutiveNotes });
messages.push({ role: "user", content: `Tool result:\n${result}\n\nYou have called note() ${consecutiveNotes} times in a row without producing output. STOP NOTING. Call done(summary="<your final markdown>") NOW with whatever analysis you have. Do not call note() again. Respond with ONLY: {"tool": "done", "args": {"summary": "..."}}` });
consecutiveNotes = 0; // reset so we only push once per streak
} else {
messages.push({ role: "user", content: `Tool result:\n${result}\n\nWhat next?` });
}
}
if (!isDone) {
log(`✗ agent did not complete within ${MAX_STEPS} steps`);
await emitObserverEvent({ event_kind: "agent_max_steps", final_step: MAX_STEPS });
// Mem0: any partial trace this session inserted should be retired
// so future agents don't get a broken playbook in their preamble.
// We don't have a trace_uid for this session yet (insert happens
// on done); but if any prior trace has the same workflow shape as
// this session's tool sequence, retire it.
// For now, just log — actual retirement would happen if seal had run.
log(` ⚠ no playbook seal will be performed for failed run`);
}
log(`session ${SESSION_ID} ended. Trace: ${TRACE_PATH}`);
if (existsSync(FINAL_PATH)) log(`Final output: ${FINAL_PATH}`);
}
mkdirSync("/home/profit/lakehouse/tests/agent_test", { recursive: true });
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });