Compare commits

..

No commits in common. "779158a09bcd2c6a53db4fcaac7b53252db859a1" and "858954975ba6aa499a91d0989b3863dd51b9ec8c" have entirely different histories.

14 changed files with 139 additions and 4058 deletions

1
Cargo.lock generated
View File

@ -8737,7 +8737,6 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
]

View File

@ -217,39 +217,8 @@ pub struct PathwayTrace {
/// such that a retired pathway would work again, a new PathwayTrace
/// with a fresh id will be inserted — retirement is per-id.)
pub retired: bool,
// ─── Mem0 versioning + deletion (J 2026-04-25 directive, mirrors
// playbook_memory's Phase 25/27 patterns) ───
/// UUID for THIS specific trace. pathway_id is the bucket key
/// (shared by traces of the same task/file_prefix/signal); trace_uid
/// addresses an individual trace within that bucket so retire/revise
/// can target it precisely. Empty on legacy traces; populated by
/// upsert/insert callers (or filled with a generated UUID on insert).
#[serde(default)]
pub trace_uid: String,
/// Mem0-style version chain. v1 for original traces; bumped on
/// `revise()`. Legacy traces deserialize as version=1 via default.
#[serde(default = "default_version")]
pub version: u32,
/// trace_uid of the trace this one supersedes (None = root version).
#[serde(default)]
pub parent_trace_uid: Option<String>,
/// Set when a newer version supersedes this trace. Excluded from
/// retrieval (hot-swap, bug_fingerprints_for) once set.
#[serde(default)]
pub superseded_at: Option<String>,
/// trace_uid of the new version. Pairs with superseded_at.
#[serde(default)]
pub superseded_by_trace_uid: Option<String>,
/// Human-readable reason recorded with retire(). Pairs with
/// `retired: true`. Empty on probation-driven retirements (those
/// just set retired=true without a textual reason).
#[serde(default)]
pub retirement_reason: Option<String>,
}
fn default_version() -> u32 { 1 }
impl PathwayTrace {
/// Compute the narrow fingerprint id from task_class + file_prefix
/// + signal_class. `file_prefix` is the first path segment
@ -380,48 +349,6 @@ pub struct HotSwapCandidate {
pub recommended_model: String,
}
/// Mem0-style outcome of an upsert. Mirrors playbook_memory::UpsertOutcome
/// but adapts the UPDATE semantic to PathwayTrace's bucket model: there
/// is no notion of merging endorsed_names — each trace is an immutable
/// run record. UPDATE here means "we found a non-retired non-superseded
/// trace with the same workflow shape; bumped its replay_count instead
/// of appending a duplicate." NOOP is reserved for the case where the
/// caller asked for an upsert that would change nothing observable.
#[derive(Debug, Serialize)]
pub enum PathwayUpsertOutcome {
Added { pathway_id: String, trace_uid: String },
Updated { pathway_id: String, trace_uid: String, replay_count: u32 },
Noop { pathway_id: String, trace_uid: String },
}
/// Mem0-style outcome of revise — chains versions across traces.
#[derive(Debug, Serialize)]
pub struct PathwayReviseOutcome {
pub parent_trace_uid: String,
pub parent_version: u32,
pub new_trace_uid: String,
pub new_version: u32,
pub superseded_at: String,
}
/// Compute a stable fingerprint for upsert dedup. Captures the
/// workflow shape: the sequence of (rung, model) pairs from
/// ladder_attempts, plus the final_verdict. Two traces with the same
/// fingerprint represent the same proven approach on the same task —
/// don't store duplicates.
fn workflow_fingerprint(trace: &PathwayTrace) -> String {
let mut h = Sha256::new();
h.update(trace.final_verdict.as_bytes());
h.update(b"|");
for a in &trace.ladder_attempts {
h.update(a.model.as_bytes());
h.update(b":");
h.update(a.rung.to_string().as_bytes());
h.update(b";");
}
format!("{:x}", h.finalize())
}
impl PathwayMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
@ -458,10 +385,6 @@ impl PathwayMemory {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let mut s = self.state.write().await;
s.pathways
.entry(trace.pathway_id.clone())
@ -472,222 +395,6 @@ impl PathwayMemory {
self.persist().await
}
/// Mem0-style upsert. ADD if no existing live trace in the bucket
/// matches this trace's workflow fingerprint. UPDATE (bump
/// replay_count) if a match exists. NOOP semantically equivalent
/// to UPDATE here — kept for symmetry with playbook_memory and
/// future-proofing if we add merge logic.
///
/// "Live" means: not retired, not superseded.
///
/// Replaces raw `insert` for callers that want dedup. Existing
/// `insert` callers (scrum_master) keep raw-append semantics so
/// behavior is back-compat.
pub async fn upsert(&self, mut trace: PathwayTrace) -> Result<PathwayUpsertOutcome, String> {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let new_fp = workflow_fingerprint(&trace);
let mut s = self.state.write().await;
let bucket = s.pathways.entry(trace.pathway_id.clone()).or_default();
// Find a live trace (not retired, not superseded) with same workflow.
let mut existing_idx: Option<usize> = None;
for (i, t) in bucket.iter().enumerate() {
if t.retired { continue; }
if t.superseded_at.is_some() { continue; }
if workflow_fingerprint(t) == new_fp {
existing_idx = Some(i);
break;
}
}
let pathway_id = trace.pathway_id.clone();
let outcome = match existing_idx {
None => {
let trace_uid = trace.trace_uid.clone();
bucket.push(trace);
PathwayUpsertOutcome::Added { pathway_id, trace_uid }
}
Some(i) => {
// UPDATE: bump replay counters on the existing trace
// instead of duplicating. Replays_succeeded only bumps
// on accepted final_verdict (mirror record_replay logic).
let existing = &mut bucket[i];
existing.replay_count = existing.replay_count.saturating_add(1);
if trace.final_verdict == "accepted" {
existing.replays_succeeded = existing.replays_succeeded.saturating_add(1);
}
PathwayUpsertOutcome::Updated {
pathway_id,
trace_uid: existing.trace_uid.clone(),
replay_count: existing.replay_count,
}
}
};
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(outcome)
}
/// Mem0-style retire. Marks a specific trace (by trace_uid) retired
/// with a human-readable reason. Retired traces are excluded from
/// hot-swap and bug_fingerprints retrieval. Idempotent: retiring an
/// already-retired trace returns Ok(false) without modification.
pub async fn retire(&self, trace_uid: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut s = self.state.write().await;
'outer: for traces in s.pathways.values_mut() {
for t in traces.iter_mut() {
if t.trace_uid == trace_uid && !t.retired {
t.retired = true;
t.retirement_reason = Some(reason.to_string());
touched = true;
break 'outer;
}
}
}
if touched {
s.last_updated_at = Utc::now().timestamp_millis();
}
}
if touched { self.persist().await?; }
Ok(touched)
}
/// Mem0-style revise. Supersedes parent trace, chains the new
/// version. New version inherits parent_trace_uid; parent gets
/// superseded_at + superseded_by_trace_uid stamped. Rejects if
/// parent is retired or already superseded (revise the tip, not
/// the middle of the chain).
pub async fn revise(
&self,
parent_trace_uid: &str,
mut new_trace: PathwayTrace,
) -> Result<PathwayReviseOutcome, String> {
let now = Utc::now().to_rfc3339();
if new_trace.pathway_vec.is_empty() {
new_trace.pathway_vec = build_pathway_vec(&new_trace);
}
if new_trace.trace_uid.is_empty() {
new_trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
let mut s = self.state.write().await;
// Locate parent across all buckets
let mut parent_loc: Option<(String, usize)> = None;
for (bucket_key, traces) in s.pathways.iter() {
for (i, t) in traces.iter().enumerate() {
if t.trace_uid == parent_trace_uid {
parent_loc = Some((bucket_key.clone(), i));
break;
}
}
if parent_loc.is_some() { break; }
}
let (parent_bucket, parent_idx) = parent_loc
.ok_or_else(|| format!("parent trace_uid '{parent_trace_uid}' not found"))?;
// Validate parent state
{
let parent = &s.pathways[&parent_bucket][parent_idx];
if parent.retired {
return Err(format!(
"cannot revise retired trace '{parent_trace_uid}' — retirement is terminal"
));
}
if parent.superseded_at.is_some() {
return Err(format!(
"trace '{parent_trace_uid}' already superseded; revise the tip of the chain"
));
}
}
let parent_version = s.pathways[&parent_bucket][parent_idx].version;
let new_version = parent_version.saturating_add(1);
let new_uid = new_trace.trace_uid.clone();
new_trace.version = new_version;
new_trace.parent_trace_uid = Some(parent_trace_uid.to_string());
new_trace.superseded_at = None;
new_trace.superseded_by_trace_uid = None;
// Stamp parent
{
let parent_mut = &mut s.pathways.get_mut(&parent_bucket).unwrap()[parent_idx];
parent_mut.superseded_at = Some(now.clone());
parent_mut.superseded_by_trace_uid = Some(new_uid.clone());
}
// Append new version (same bucket if same pathway_id)
s.pathways
.entry(new_trace.pathway_id.clone())
.or_default()
.push(new_trace);
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(PathwayReviseOutcome {
parent_trace_uid: parent_trace_uid.to_string(),
parent_version,
new_trace_uid: new_uid,
new_version,
superseded_at: now,
})
}
/// Walk the version chain containing trace_uid. Returns root→tip.
/// Empty if trace_uid not found. Cycle-safe.
pub async fn history(&self, trace_uid: &str) -> Vec<PathwayTrace> {
let s = self.state.read().await;
// Build trace_uid → trace map across all buckets
let mut by_uid: HashMap<String, PathwayTrace> = HashMap::new();
for traces in s.pathways.values() {
for t in traces {
if !t.trace_uid.is_empty() {
by_uid.insert(t.trace_uid.clone(), t.clone());
}
}
}
let Some(seed) = by_uid.get(trace_uid).cloned() else {
return Vec::new();
};
// Walk back to root
let mut visited: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut root = seed.clone();
while let Some(parent_id) = root.parent_trace_uid.clone() {
if !visited.insert(parent_id.clone()) { break; }
match by_uid.get(&parent_id) {
Some(p) => root = p.clone(),
None => break,
}
}
// Walk forward to tip
let mut chain = vec![root.clone()];
let mut visited_fwd: std::collections::HashSet<String> = std::collections::HashSet::new();
visited_fwd.insert(root.trace_uid.clone());
let mut current = root;
while let Some(succ) = current.superseded_by_trace_uid.clone() {
if !visited_fwd.insert(succ.clone()) { break; }
match by_uid.get(&succ) {
Some(n) => {
chain.push(n.clone());
current = n.clone();
}
None => break,
}
}
chain
}
/// Query for a hot-swap candidate. Returns `None` if no eligible
/// pathway exists — caller should run the full ladder. Returns
/// `Some(cand)` if all gates pass — caller can short-circuit to
@ -715,11 +422,6 @@ impl PathwayMemory {
if p.retired {
continue;
}
// Mem0 versioning: superseded traces are excluded from
// retrieval — only the tip of each version chain counts.
if p.superseded_at.is_some() {
continue;
}
// audit_consensus gate: explicit FAIL blocks hot-swap. A null
// audit_consensus (auditor hasn't seen this pathway yet) is
// NOT a block — the success_rate gate below still requires
@ -821,11 +523,6 @@ impl PathwayMemory {
// are semantically equivalent within a pattern_key by design).
let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new();
for t in traces {
// Mem0 versioning: skip retired + superseded traces so
// their bug patterns don't leak into future retrievals.
if t.retired || t.superseded_at.is_some() {
continue;
}
for bp in &t.bug_fingerprints {
let key = (format!("{:?}", bp.flag), bp.pattern_key.clone());
let entry = agg.entry(key).or_insert_with(|| {

View File

@ -157,11 +157,6 @@ pub fn router(state: VectorState) -> Router {
.route("/pathway/stats", get(pathway_stats))
// ADR-021 Phase C: pre-review bug-fingerprint retrieval.
.route("/pathway/bug_fingerprints", post(pathway_bug_fingerprints))
// Mem0 ops (J 2026-04-25): upsert/retire/revise/history.
.route("/pathway/upsert", post(pathway_upsert))
.route("/pathway/retire", post(pathway_retire))
.route("/pathway/revise", post(pathway_revise))
.route("/pathway/history/{trace_uid}", get(pathway_history))
.with_state(state)
}
@ -2909,58 +2904,6 @@ async fn pathway_bug_fingerprints(
Json(json!({ "fingerprints": fps }))
}
// ─── Mem0 ops endpoints (J 2026-04-25) ───
async fn pathway_upsert(
State(state): State<VectorState>,
Json(trace): Json<pathway_memory::PathwayTrace>,
) -> impl IntoResponse {
match state.pathway_memory.upsert(trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayRetireRequest {
trace_uid: String,
reason: String,
}
async fn pathway_retire(
State(state): State<VectorState>,
Json(req): Json<PathwayRetireRequest>,
) -> impl IntoResponse {
match state.pathway_memory.retire(&req.trace_uid, &req.reason).await {
Ok(touched) => Ok(Json(json!({"ok": true, "retired": touched}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayReviseRequest {
parent_trace_uid: String,
new_trace: pathway_memory::PathwayTrace,
}
async fn pathway_revise(
State(state): State<VectorState>,
Json(req): Json<PathwayReviseRequest>,
) -> impl IntoResponse {
match state.pathway_memory.revise(&req.parent_trace_uid, req.new_trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn pathway_history(
State(state): State<VectorState>,
axum::extract::Path(trace_uid): axum::extract::Path<String>,
) -> impl IntoResponse {
let chain = state.pathway_memory.history(&trace_uid).await;
Json(json!({"trace_uid": trace_uid, "chain_len": chain.len(), "chain": chain}))
}
#[cfg(test)]
mod extractor_tests {
use super::*;

View File

@ -18,7 +18,6 @@ import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { z } from "zod";
import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js";
import { buildPermitBrief } from "./entity.js";
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const PORT = parseInt(process.env.MCP_PORT || "3700");
@ -961,61 +960,6 @@ async function main() {
return new Response(Bun.file(import.meta.dir + "/console.html"));
}
// ─── Contractor / entity drill-down page ───
// Single-contractor portfolio view across every wired source:
// OSHA national, Chicago history, ticker chart, parent link,
// federal contracts, debarment, unions, training. Click any
// contractor name in a permit Entity Brief to land here.
if (url.pathname === "/contractor") {
return new Response(Bun.file(import.meta.dir + "/contractor.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
if (url.pathname === "/intelligence/contractor_profile" && req.method === "POST") {
const start = Date.now();
try {
const b = (await req.json().catch(() => ({}))) as { name?: string };
if (!b.name) return err("missing name", 400);
// Use the entity-brief library directly — single entity, all sources.
const { fetchOshaBrief, fetchTickerBrief, fetchContractorHistory, fetchParentLink, fetchFederalContracts, fetchDebarmentBrief, fetchNlrbBriefReal, fetchIlsosBrief, fetchNewsMentions, fetchDiversityCerts, scoreNewsSentiment, fetchBlsConstructionTrend, normalizeEntityName, entityTicker } = await import("./entity.js");
const [osha, stock, history, parent_link, federal, debarment, nlrb, ilsos, news, diversity, macro] = await Promise.all([
fetchOshaBrief(b.name),
fetchTickerBrief(b.name),
fetchContractorHistory(b.name),
fetchParentLink(b.name),
fetchFederalContracts(b.name),
fetchDebarmentBrief(b.name),
fetchNlrbBriefReal(b.name),
fetchIlsosBrief(b.name),
fetchNewsMentions(b.name),
fetchDiversityCerts(b.name),
fetchBlsConstructionTrend(),
]);
const news_sentiment = news ? scoreNewsSentiment(news) : null;
return ok({
key: normalizeEntityName(b.name),
display_name: b.name,
ticker: entityTicker(b.name),
osha,
stock,
history,
parent_link,
federal,
debarment,
nlrb,
ilsos,
news,
news_sentiment,
diversity,
macro,
generated_at: new Date().toISOString(),
duration_ms: Date.now() - start,
});
} catch (e: any) {
return err(`contractor_profile: ${e.message}`, 500);
}
}
// Intelligence: Market data — public building permits → staffing demand forecast
if (url.pathname === "/intelligence/market" && req.method === "POST") {
const start = Date.now();
@ -1288,12 +1232,8 @@ async function main() {
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
// Recent + substantial permits only — skip tiny ones that
// don't imply real staffing demand.
// Include contact_1 + contact_2 fields so the Entity Brief
// panel on each card can populate without a second fetch.
// Contacts identify the applicant / contractor by name —
// those are the keys we pass to OSHA/ILSOS enrichment.
const permits: any[] = await fetch(
`${permitUrl}?$select=id,permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,contact_1_name,contact_1_type,contact_2_name,contact_2_type&`
`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date&`
+ `$where=reported_cost>250000 AND issue_date>'2025-06-01'`
+ `&$order=issue_date DESC&$limit=6`
).then(r => r.json()).catch(() => []);
@ -1427,19 +1367,12 @@ async function main() {
contracts.push({
permit: {
id: p.id,
cost,
work_type: p.work_type || "General construction",
description: (p.work_description || "").substring(0, 140),
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
community_area: p.community_area,
issue_date: (p.issue_date || "").substring(0, 10),
// Contacts — used by /intelligence/permit_entities to
// enrich each card with OSHA + ILSOS on expand.
contact_1_name: p.contact_1_name || "",
contact_1_type: p.contact_1_type || "",
contact_2_name: p.contact_2_name || "",
contact_2_type: p.contact_2_type || "",
},
implied_bill_rate: contractBillRate,
timeline: {
@ -1493,58 +1426,6 @@ async function main() {
}
}
// Intelligence: per-permit entity brief — OSHA + ILSOS + property
// Takes a permit identifier (we look it up from Chicago Socrata) or
// raw contact fields directly from the client. Returns an "ETF
// basket" shape: property + entities + per-entity risk factors.
// OSHA is live-scraped (cached 30d). ILSOS returns a structured
// placeholder because apps.ilsos.gov blocks our ASN.
if (url.pathname === "/intelligence/permit_entities" && req.method === "POST") {
const start = Date.now();
try {
const b = await req.json().catch(() => ({})) as {
permit_id?: string;
address?: string;
work_type?: string;
contact_1_name?: string;
contact_1_type?: string;
contact_2_name?: string;
contact_2_type?: string;
fetch_osha?: boolean;
fetch_ilsos?: boolean;
};
// If the caller didn't pass contact fields but did pass a
// permit_id, go pull the record from Chicago Socrata.
let permit = b;
if (b.permit_id && !b.contact_1_name) {
const u = `https://data.cityofchicago.org/resource/ydr8-5enu.json?$where=id='${encodeURIComponent(b.permit_id)}'`;
const rows = (await fetch(u).then((r) => r.json())) as any[];
const p = rows?.[0];
if (p) {
const addr = [p.street_number, p.street_direction, p.street_name]
.filter(Boolean)
.join(" ");
permit = {
permit_id: b.permit_id,
address: addr,
work_type: p.work_type,
contact_1_name: p.contact_1_name,
contact_1_type: p.contact_1_type,
contact_2_name: p.contact_2_name,
contact_2_type: p.contact_2_type,
};
}
}
const brief = await buildPermitBrief(permit, {
fetchOsha: b.fetch_osha !== false,
fetchIlsos: b.fetch_ilsos !== false,
});
return ok({ ...brief, duration_ms: Date.now() - start });
} catch (e: any) {
return err(`permit_entities: ${e.message}`, 500);
}
}
// Removed 2026-04-20: /intelligence/learn was a legacy CSV writer
// that destructively re-wrote successful_playbooks. /log and
// /log_failure replace it cleanly via /vectors/playbook_memory/seed

View File

@ -224,170 +224,6 @@ async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: Observe
// persists across cycles.
const escalatedSigHashes = new Set<string>();
// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ───
//
// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict
// can be treated as truth about whether a candidate review is grounded.
// Two-tier evaluator:
// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with
// response + source excerpt + grounding stats as context.
// 2. On cloud failure (throttle/timeout) → deterministic heuristic
// over grounding_pct + total_quotes. Marked source: "heuristic"
// so consumers can tell which rung produced the verdict.
// Every verdict is persisted to data/_kb/observer_reviews.jsonl.
const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl";
interface HandReviewInput {
file_path: string;
model: string;
response: string;
source_content: string;
grounding_stats: { total: number; grounded: number; groundedPct: number | null };
attempt: number;
}
interface HandReviewVerdict {
verdict: "accept" | "reject" | "cycle";
confidence: number;
notes: string;
source: "cloud" | "heuristic";
}
async function handReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const t0 = Date.now();
let verdict: HandReviewVerdict;
try {
verdict = await cloudHandReview(input);
} catch (e) {
console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`);
verdict = heuristicHandReview(input);
}
// Persist regardless of source so we can later compare cloud vs
// heuristic verdicts on the same input and tune the heuristic.
const row = {
ts: new Date().toISOString(),
file_path: input.file_path,
model: input.model,
attempt: input.attempt,
response_chars: input.response.length,
grounding_stats: input.grounding_stats,
verdict: verdict.verdict,
confidence: verdict.confidence,
notes: verdict.notes,
source: verdict.source,
duration_ms: Date.now() - t0,
};
try {
const { appendFile } = await import("node:fs/promises");
await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n");
} catch { /* best-effort persistence */ }
return verdict;
}
async function cloudHandReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const grounded = input.grounding_stats.grounded;
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
// Truncate to keep the prompt under typical context windows.
// 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context.
const responseExcerpt = input.response.slice(0, 2000);
const sourceExcerpt = input.source_content.slice(0, 4000);
const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated.
FILE: ${input.file_path}
MODEL: ${input.model}
ATTEMPT: ${input.attempt}
ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""}
REVIEW (first 2000 chars):
\`\`\`
${responseExcerpt}
\`\`\`
SOURCE EXCERPT (first 4000 chars):
\`\`\`
${sourceExcerpt}
\`\`\`
Respond ONLY with a JSON object:
{
"verdict": "accept" | "reject" | "cycle",
"confidence": 0-100,
"notes": "<1-2 sentences on what makes this grounded or hallucinated>"
}
- accept: review references real symbols/lines in source; findings could be acted on.
- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record.
- cycle: review is mediocre partially grounded but wrong shape, try a stronger model.`;
// Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud
// throttle that drove every prior iter into the heuristic fallback.
// Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical
// hand-review (~6K input + 300 output) costs ~$0.0014. Selected via
// J directive 2026-04-25 ("best model under $0.72/M").
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model: "x-ai/grok-4.1-fast",
messages: [{ role: "user", content: prompt }],
max_tokens: 300,
temperature: 0.0,
}),
signal: AbortSignal.timeout(45000),
});
if (!resp.ok) {
throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
}
const j: any = await resp.json();
const content = (j?.choices?.[0]?.message?.content ?? "").trim();
// Pull JSON object from the response — model may wrap it in prose.
const m = content.match(/\{[\s\S]*\}/);
if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`);
const parsed = JSON.parse(m[0]);
const v = String(parsed.verdict ?? "accept").toLowerCase();
return {
verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept",
confidence: Number(parsed.confidence ?? 50),
notes: String(parsed.notes ?? "").slice(0, 500),
source: "cloud",
};
}
function heuristicHandReview(input: HandReviewInput): HandReviewVerdict {
// Deterministic fallback when cloud is throttled. Conservative:
// only flip to reject when the evidence is overwhelming, otherwise
// accept (fall-open principle — observer is policy, not blocker).
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
const respLen = input.response.length;
// Too short to be a real review
if (respLen < 1500) {
return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" };
}
// Below 5 quotes — not enough signal to judge grounding; accept
if (total < 5 || pct === null) {
return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" };
}
// Very heavy hallucination
if (pct < 20) {
return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" };
}
// Mediocre — cycle to a stronger model
if (pct < 50) {
return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" };
}
// Good enough
return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" };
}
async function maybeEscalate(failures: ObservedOp[]) {
// Group failures by sig_hash
const bySig = new Map<string, ObservedOp[]>();
@ -526,28 +362,6 @@ function startHttpListener() {
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
}));
}
// ─── Hand-review endpoint (2026-04-25) ───
// scrum/agent posts a candidate response + source content + grounding
// stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with
// semantic context and returns {verdict, confidence, notes}. On
// cloud throttle, falls back to a deterministic heuristic over the
// grounding stats so the loop keeps moving with honest signal.
//
// This is the policy layer scrum was missing — pre-2026-04-25 the
// scrum_master applied a hardcoded grounding-rate threshold inline,
// which baked judgment into the wrong layer. Now scrum reports data
// (response + source + stats) and observer decides accept/reject/cycle.
if (req.method === "POST" && url.pathname === "/review") {
return req.json().then((body: any) => handReview(body))
.then((verdict) => new Response(JSON.stringify(verdict), {
headers: { "content-type": "application/json" },
}))
.catch((e: Error) =>
new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), {
status: 200, // fall-open shape — scrum keeps moving on observer failure
headers: { "content-type": "application/json" },
}));
}
if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => {
const op: ObservedOp = {

File diff suppressed because it is too large Load Diff

View File

@ -1,259 +0,0 @@
#!/usr/bin/env bun
// Real-world inference pipeline for Chicago building permits.
// Uses the unified matrix retriever (chicago_permits + entity_brief +
// sec_tickers + llm_team_runs + distilled_procedural) to enrich a
// Grok 4.1 fast analysis. Observer hand-reviews each result.
//
// First true USE of the matrix architecture on real ingested data —
// not the scrum self-improvement loop, the staffing intelligence loop.
//
// Usage:
// bun run scripts/analyze_chicago_contracts.ts [N]
// N = number of permits to analyze (default 5)
const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100";
const OBSERVER = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
const RAW_BUCKET = "raw";
const MC_ALIAS = "local";
const STAGE_DIR = "/tmp/chicago_analyze";
const OUTPUT = "/home/profit/lakehouse/data/_kb/contract_analyses.jsonl";
const CONTRACT_CORPORA = [
"chicago_permits_v1",
"entity_brief_v1",
"sec_tickers_v1",
"llm_team_runs_v1",
"llm_team_response_cache_v1",
"distilled_procedural_v20260423102847",
];
interface Permit {
permit_?: string;
permit_type?: string;
permit_status?: string;
work_description?: string;
reported_cost?: string | number;
contact_1?: any;
contact_2?: any;
contact_3_name?: string;
street_number?: string;
street_direction?: string;
street_name?: string;
suffix?: string;
issue_date?: string;
community_area?: string;
ward?: string;
[k: string]: any;
}
interface MatrixHit {
source_corpus: string;
score: number;
doc_id: string;
text: string;
}
function log(msg: string) { console.log(`[contract ${new Date().toISOString().slice(11,19)}] ${msg}`); }
async function fetchPermits(n: number): Promise<Permit[]> {
const fs = await import("node:fs/promises");
await fs.mkdir(STAGE_DIR, { recursive: true });
const local = `${STAGE_DIR}/permits.json`;
const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/chicago/permits_2026-04-25.json`, local]);
await proc.exited;
const all: Permit[] = JSON.parse(await Bun.file(local).text());
// Pick high-cost permits with named contractors — most interesting for staffing analysis.
// Field is `contact_1_name`, not `contact_1`. reported_cost is integer-like string.
const meaningful = all.filter(p =>
p.reported_cost && Number(p.reported_cost) >= 100000 &&
(p.contact_1_name || p.contact_2_name)
);
log(`raw permits: ${all.length} · meaningful (cost >= $100k + has contractor): ${meaningful.length}`);
// Sample evenly across the meaningful set
const sampled: Permit[] = [];
const stride = Math.max(1, Math.floor(meaningful.length / n));
for (let i = 0; i < meaningful.length && sampled.length < n; i += stride) {
sampled.push(meaningful[i]);
}
return sampled;
}
function permitToText(p: Permit): string {
const addr = `${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.replace(/\s+/g, " ").trim();
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
const c2 = (p as any).contact_2_name ?? (typeof p.contact_2 === "string" ? p.contact_2 : (p.contact_2?.name ?? ""));
return [
`Chicago Building Permit ${p.permit_ ?? "?"}`,
`Type: ${p.permit_type ?? "?"} · Status: ${p.permit_status ?? "?"}`,
`Address: ${addr} · Community ${p.community_area ?? "?"} · Ward ${p.ward ?? "?"}`,
`Issued: ${p.issue_date ?? "?"}`,
`Reported cost: $${Number(p.reported_cost ?? 0).toLocaleString()}`,
`Primary contractor: ${c1 || "unknown"}`,
c2 ? `Secondary: ${c2}` : "",
`Owner: ${p.contact_3_name ?? "?"}`,
`Work description: ${(p.work_description ?? "").slice(0, 800)}`,
].filter(Boolean).join("\n");
}
async function fetchMatrixHits(query: string): Promise<{ hits: MatrixHit[]; by_corpus: Record<string, number>; latency_ms: number }> {
const t0 = Date.now();
const all: MatrixHit[] = [];
const byCorpus: Record<string, number> = {};
await Promise.all(CONTRACT_CORPORA.map(async (idx) => {
try {
const r = await fetch(`${GATEWAY}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: idx, query, top_k: 3 }),
signal: AbortSignal.timeout(15000),
});
if (!r.ok) { byCorpus[idx] = -1; return; }
const data: any = await r.json();
const results = data.results ?? [];
byCorpus[idx] = results.length;
for (const h of results) {
all.push({
source_corpus: idx,
score: Number(h.score ?? 0),
doc_id: String(h.doc_id ?? "?"),
text: String(h.chunk_text ?? "").slice(0, 400),
});
}
} catch { byCorpus[idx] = -1; }
}));
all.sort((a, b) => b.score - a.score);
return { hits: all.slice(0, 10), by_corpus: byCorpus, latency_ms: Date.now() - t0 };
}
function buildMatrixPreamble(hits: MatrixHit[]): string {
if (hits.length === 0) return "";
const lines = [
`═══ 📖 MATRIX CONTEXT — ${hits.length} relevant hits across the knowledge base ═══`,
"Reference material from prior contractor data, SEC tickers, LLM team analyses, and distilled procedures. Use as evidence; do NOT invent.",
"",
];
for (let i = 0; i < hits.length; i++) {
const h = hits[i];
lines.push(`[${i + 1}] ${h.source_corpus} (score=${h.score.toFixed(2)}, doc=${h.doc_id}): ${h.text.replace(/\s+/g, " ").trim()}`);
}
lines.push("═══");
lines.push("");
return lines.join("\n");
}
async function chat(model: string, prompt: string): Promise<{ content: string; error?: string }> {
try {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model,
messages: [{ role: "user", content: prompt }],
max_tokens: 1500,
temperature: 0.1,
}),
signal: AbortSignal.timeout(90000),
});
if (!r.ok) return { content: "", error: `HTTP ${r.status}: ${(await r.text()).slice(0, 200)}` };
const j: any = await r.json();
return { content: j.choices?.[0]?.message?.content ?? "" };
} catch (e: any) { return { content: "", error: e.message }; }
}
async function observerReview(input: { permit_id: string; model: string; response: string; permit_text: string }): Promise<{ verdict: string; confidence: number; notes: string; source: string }> {
try {
const r = await fetch(`${OBSERVER}/review`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
file_path: `chicago_permit/${input.permit_id}`,
model: input.model,
response: input.response,
source_content: input.permit_text,
grounding_stats: { total: 0, grounded: 0, groundedPct: null },
attempt: 1,
}),
signal: AbortSignal.timeout(90000),
});
if (!r.ok) return { verdict: "accept", confidence: 50, notes: `observer ${r.status}`, source: "fallthrough" };
return await r.json();
} catch (e: any) { return { verdict: "accept", confidence: 50, notes: `observer error: ${e.message}`, source: "fallthrough" }; }
}
async function analyzeOne(p: Permit, idx: number, total: number): Promise<any> {
const permit_id = p.permit_ ?? `unknown_${idx}`;
const t0 = Date.now();
log(`══ permit ${idx + 1}/${total} · ${permit_id} · type=${p.permit_type} · cost=$${Number(p.reported_cost ?? 0).toLocaleString()}`);
const permitText = permitToText(p);
// Build matrix query: combine type + work description + contractor name for retrieval anchoring
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
const matrixQuery = `${p.permit_type ?? ""} ${(p.work_description ?? "").slice(0, 300)} ${c1}`;
const matrix = await fetchMatrixHits(matrixQuery);
const corporaSummary = Object.entries(matrix.by_corpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
log(` 📖 matrix: ${matrix.hits.length} hits in ${matrix.latency_ms}ms · ${corporaSummary}`);
const preamble = buildMatrixPreamble(matrix.hits);
const task = `${preamble}You are a staffing-intelligence analyst reviewing a real Chicago building permit. Using the MATRIX CONTEXT above as evidence, produce a structured analysis:
PERMIT:
${permitText}
Produce a markdown analysis with:
1. **Permit summary** 2 sentences on what this is
2. **Contractor signal** what we know about the named contractor(s) from matrix context (cite [N] hits). If unknown, say so.
3. **Staffing fit** what trades/headcount/skills this permit implies
4. **Risk flags** anything in matrix context that suggests caution (debarment, prior incidents, low-quality history). If none, say so.
5. **Opportunity score** 0-100 with one-sentence rationale
Cite matrix hits as [N] inline. If matrix has no relevant hits, say "no matrix evidence" do NOT invent contractor history.`;
const resp = await chat("x-ai/grok-4.1-fast", task);
if (resp.error) {
log(` ✗ chat error: ${resp.error.slice(0, 100)}`);
return { permit_id, ok: false, error: resp.error, ts: new Date().toISOString() };
}
log(` ✓ analysis ${resp.content.length} chars`);
const verdict = await observerReview({ permit_id, model: "openrouter/x-ai/grok-4.1-fast", response: resp.content, permit_text: permitText });
log(` observer: ${verdict.verdict} (conf=${verdict.confidence}, src=${verdict.source})`);
return {
permit_id, ok: true,
permit_type: p.permit_type, cost: Number(p.reported_cost ?? 0),
contractor: c1, matrix_hits: matrix.hits.length, matrix_corpora: matrix.by_corpus, matrix_ms: matrix.latency_ms,
analysis: resp.content,
observer_verdict: verdict.verdict, observer_conf: verdict.confidence, observer_notes: verdict.notes, observer_src: verdict.source,
duration_ms: Date.now() - t0, ts: new Date().toISOString(),
};
}
async function main() {
const n = Number(process.argv[2] ?? 5);
log(`fetching ${n} permits from raw bucket...`);
const permits = await fetchPermits(n);
log(`analyzing ${permits.length} permits sequentially...`);
const fs = await import("node:fs/promises");
const { appendFile } = fs;
const results: any[] = [];
for (let i = 0; i < permits.length; i++) {
const r = await analyzeOne(permits[i], i, permits.length);
results.push(r);
await appendFile(OUTPUT, JSON.stringify(r) + "\n");
}
log(`\n══ SUMMARY ══`);
const ok = results.filter(r => r.ok).length;
const accepted = results.filter(r => r.observer_verdict === "accept").length;
const cycled = results.filter(r => r.observer_verdict === "cycle").length;
const rejected = results.filter(r => r.observer_verdict === "reject").length;
const avgHits = results.reduce((a, r) => a + (r.matrix_hits ?? 0), 0) / Math.max(1, results.length);
log(` permits analyzed: ${ok}/${results.length}`);
log(` observer: accept=${accepted} cycle=${cycled} reject=${rejected}`);
log(` avg matrix hits per permit: ${avgHits.toFixed(1)}`);
log(` output: ${OUTPUT}`);
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -1,95 +0,0 @@
#!/bin/bash
# One-shot dump of all testing data into the `raw` MinIO bucket.
# Persistent test corpus so we don't re-extract every run.
#
# Layout:
# raw/
# staffing/ — workers_500k.parquet, resumes.parquet
# entities/ — entities.jsonl, sec_company_tickers.json
# llm_team/ — *.jsonl extracts from knowledge_base PG tables
# chicago/ — permits_<date>.json (last 30 days)
# MANIFEST.json — documents what's here + when
set -euo pipefail
REPO=/home/profit/lakehouse
BUCKET=raw
ALIAS=local
STAGE=$(mktemp -d /tmp/raw_dump.XXXXX)
trap 'rm -rf "$STAGE"' EXIT
DATE=$(date -u +%Y-%m-%d)
log() { echo "[dump $(date -u +%H:%M:%S)] $*"; }
log "creating bucket ${ALIAS}/${BUCKET} (idempotent)"
mc mb --ignore-existing ${ALIAS}/${BUCKET}
# ─── 1. STAFFING ───
log "staffing/ — workers_500k.parquet (323 MB) + resumes.parquet"
mc cp -q ${REPO}/data/datasets/workers_500k.parquet ${ALIAS}/${BUCKET}/staffing/workers_500k.parquet
mc cp -q ${REPO}/data/datasets/resumes.parquet ${ALIAS}/${BUCKET}/staffing/resumes.parquet
# ─── 2. ENTITIES + SEC + GEO ───
log "entities/ — contractor entities cache + SEC tickers + svep + tif districts"
mc cp -q ${REPO}/data/_entity_cache/entities.jsonl ${ALIAS}/${BUCKET}/entities/entities.jsonl
mc cp -q ${REPO}/data/_entity_cache/sec_company_tickers.json ${ALIAS}/${BUCKET}/sec/company_tickers.json
mc cp -q ${REPO}/data/_entity_cache/svep_log.json ${ALIAS}/${BUCKET}/entities/svep_log.json
mc cp -q ${REPO}/data/_entity_cache/tif_districts.geojson ${ALIAS}/${BUCKET}/chicago/tif_districts.geojson
# ─── 3. LLM TEAM HISTORY (Postgres → JSONL → S3) ───
log "llm_team/ — extracting from knowledge_base PG tables"
LLM_TABLES=(team_runs pipeline_runs lab_experiments lab_trials meta_pipelines meta_runs conversations response_cache memory_entries adaptive_runs)
for tbl in "${LLM_TABLES[@]}"; do
out=${STAGE}/${tbl}.jsonl
rows=$(sudo -u postgres psql -d knowledge_base -At -c "SELECT COUNT(*) FROM ${tbl};" 2>/dev/null || echo 0)
if [ "$rows" -eq 0 ]; then
log " · ${tbl}: 0 rows, skipping"
continue
fi
sudo -u postgres psql -d knowledge_base -At -c "COPY (SELECT row_to_json(t) FROM ${tbl} t) TO STDOUT;" > "$out" 2>/dev/null
size=$(du -h "$out" | awk '{print $1}')
log " · ${tbl}: ${rows} rows (${size})"
mc cp -q "$out" ${ALIAS}/${BUCKET}/llm_team/${tbl}.jsonl
done
# ─── 4. CHICAGO PERMITS (last 30 days, paginated) ───
log "chicago/ — pulling last 30 days of permits from data.cityofchicago.org"
since=$(date -u -d '30 days ago' +%Y-%m-%d)
out=${STAGE}/permits_${DATE}.json
url="https://data.cityofchicago.org/resource/ydr8-5enu.json?\$where=issue_date%3E='${since}'&\$limit=10000&\$order=issue_date%20DESC"
if curl -sf --max-time 60 "$url" -o "$out"; then
count=$(python3 -c "import json; print(len(json.load(open('${out}'))))")
size=$(du -h "$out" | awk '{print $1}')
log " · permits since ${since}: ${count} records (${size})"
mc cp -q "$out" ${ALIAS}/${BUCKET}/chicago/permits_${DATE}.json
else
log " · WARN: chicago permits fetch failed; skipping"
fi
# ─── 5. MANIFEST ───
log "writing MANIFEST.json"
manifest=${STAGE}/MANIFEST.json
python3 - <<PY
import json, subprocess, datetime
out = subprocess.check_output(['mc','ls','-r','--json','${ALIAS}/${BUCKET}'], text=True)
items = []
for line in out.strip().split('\n'):
if not line: continue
o = json.loads(line)
items.append({'key': o.get('key',''), 'size': o.get('size',0)})
total_size = sum(i['size'] for i in items)
manifest = {
'bucket': '${BUCKET}',
'created_at': datetime.datetime.utcnow().isoformat() + 'Z',
'total_objects': len(items),
'total_size_bytes': total_size,
'total_size_human': f'{total_size / (1024*1024):.1f} MB',
'items': items,
}
with open('${manifest}','w') as f:
json.dump(manifest, f, indent=2)
PY
mc cp -q "$manifest" ${ALIAS}/${BUCKET}/MANIFEST.json
log "DONE. Bucket contents:"
mc ls -r ${ALIAS}/${BUCKET} | head -30

View File

@ -1,177 +0,0 @@
#!/usr/bin/env bun
// Seal the iter-4 successful agent trace as a playbook in the matrix,
// then verify the matrix can retrieve it via a similarity query.
//
// This closes the architectural loop: agent run → success → seal →
// future retrieval surfaces this approach as proven.
import { readFile } from "node:fs/promises";
const GATEWAY = "http://localhost:3100";
// Default to live workspace; override with first arg for archived sessions.
const SESSION_DIR = process.argv[2] ?? "/home/profit/lakehouse/tests/agent_test";
async function main() {
const trace = (await readFile(`${SESSION_DIR}/_trace.jsonl`, "utf8"))
.split("\n").filter(l => l.trim()).map(l => JSON.parse(l));
const finalMd = await readFile(`${SESSION_DIR}/_final.md`, "utf8");
// Extract tool sequence from trace
const toolCalls = trace.filter(t => t.kind === "tool_call");
const toolSeq = toolCalls.map(t => t.tool).join(" → ");
const totalSteps = toolCalls.length;
const totalLatency = trace.filter(t => t.latency_ms).reduce((a, t) => a + (t.latency_ms ?? 0), 0);
console.log(`iter-4 trace: ${trace.length} events, ${totalSteps} tool calls, ${(totalLatency/1000).toFixed(1)}s total`);
console.log(`tool sequence: ${toolSeq}`);
console.log(`final output: ${finalMd.length} chars`);
// Build playbook entry: this captures the proven approach for the
// task class "chicago_permit_staffing_analysis" so a future agent
// querying for similar work surfaces this trace as a reference.
const operation = `Chicago permit staffing analysis — qwen3.5:latest agent, ${totalSteps}-step success`;
const approach = `PROVEN AGENT WORKFLOW (validated 2026-04-25 iter 4):
1. PLAN FIRST via note() explicit step list before any execution
2. list_permits(min_cost=N) get high-cost candidates
3. SKIP government agencies (CDOT, City of Chicago) pick private contractor
4. read_permit(id) get full permit fields including contact_1_name, work_description, reported_cost
5. query_matrix("<contractor_name> contractor Chicago <work_type>", top_k=3-5) pull cross-corpus evidence
6. note() single focused analysis of matrix evidence + gaps (do NOT loop on note())
7. done(summary=<5-section markdown>) Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation
KEY LESSONS:
- llm_team_runs_v1 + llm_team_response_cache_v1 are noise corpora exclude
- Useful corpora: chicago_permits_v1, entity_brief_v1, sec_tickers_v1, distilled_procedural_v20260423102847
- Matrix often returns "no specific evidence" for private contractors that's OK, acknowledge gap honestly, do NOT invent history
- Recommendation should reflect actual evidence: "Investigate-Further" when matrix is empty, not generic "Pursue"
- Total wall 30s for 6 tool calls`;
const context = `PRD: tests/agent_test/PRD.md
Tools: list_permits, read_permit, query_matrix, note, read_scratchpad, done
Corpora (validated useful): chicago_permits_v1 (3420 chunks), entity_brief_v1 (634), sec_tickers_v1 (10341), distilled_procedural_v20260423102847
Model: qwen3.5:latest (local Ollama, think:false)
Source data: 2,853 Chicago building permits (last 30d), 552 with cost >= $100K and named contractors
Output spec: 5-section markdown (Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation), 600-1000 words`;
// endorsed_names: keywords that should match similar future queries
const endorsedNames = [
"qwen3.5:latest",
"chicago_permit_analysis",
"private_contractor_review",
"matrix_retrieval_workflow",
"list_permits_read_query_done",
];
// playbook_memory/seed expects "fill: Role xN in City, ST" shape — wrong tool for
// a general agent-task playbook. Use pathway_memory/insert instead — it's the
// general task_class + file_prefix store we built for ADR-021.
console.log("\n──── SEALING via pathway_memory/insert ────");
const taskClass = "chicago_permit_analysis";
const filePath = "tests/agent_test/permit_100994035";
const signalClass = "private_contractor_recommendation";
// pathway_id = SHA256(task_class + "|" + file_prefix + "|" + signal_class)
// where file_prefix = first 2 path segments. Matches gateway's hot-swap logic.
const filePrefix = filePath.split("/").slice(0, 2).join("/");
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(`${taskClass}|${filePrefix}|${signalClass}`);
const pathwayId = hasher.digest("hex");
console.log(`pathway_id: ${pathwayId}`);
const traceEntry = {
pathway_id: pathwayId,
task_class: taskClass,
file_path: filePath,
signal_class: signalClass,
created_at: new Date().toISOString(),
ladder_attempts: toolCalls.map((t, i) => ({
rung: i + 1,
model: t.tool === "done" ? "qwen3.5:latest+done" : `qwen3.5:latest+${t.tool}`,
latency_ms: t.latency_ms ?? 0,
accepted: t.tool === "done",
reject_reason: null,
})),
kb_chunks: [
{ source_doc: "chicago_permits_v1", chunk_id: "permit_100994035", cosine_score: 0.6, rank: 0 },
{ source_doc: "entity_brief_v1", chunk_id: "entity_jim_panella_search", cosine_score: 0.58, rank: 1 },
{ source_doc: "sec_tickers_v1", chunk_id: "sec_no_match", cosine_score: 0.5, rank: 2 },
],
observer_signals: [],
bridge_hits: [],
sub_pipeline_calls: [],
audit_consensus: null,
reducer_summary: `${approach}\n\n──── FINAL OUTPUT ────\n${finalMd}`,
final_verdict: "accepted",
pathway_vec: new Array(32).fill(0), // gateway computes/replaces if it does
replay_count: 0,
replays_succeeded: 0,
semantic_flags: [],
type_hints_used: [],
bug_fingerprints: [],
retired: false,
};
// Use Mem0-style upsert (J 2026-04-25). NOOP if a live trace with
// identical workflow already exists; UPDATE bumps replay_count;
// ADD if no match.
const seal = await fetch(`${GATEWAY}/vectors/pathway/upsert`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(traceEntry),
signal: AbortSignal.timeout(30000),
});
if (!seal.ok) {
console.error(`✗ seal failed: ${seal.status}${(await seal.text()).slice(0, 300)}`);
process.exit(1);
}
const sealResult = await seal.json();
console.log(`✓ sealed via pathway/upsert: ${JSON.stringify(sealResult).slice(0, 300)}`);
// sealResult.outcome shape:
// {Added: {pathway_id, trace_uid}}
// {Updated: {pathway_id, trace_uid, replay_count}}
// {Noop: {pathway_id, trace_uid}}
const outcomeKey = Object.keys(sealResult.outcome ?? {})[0];
console.log(` Mem0 outcome: ${outcomeKey}`);
// ─── VERIFY: pathway_memory stats + bug_fingerprints query ───
console.log("\n──── VERIFYING RETRIEVAL ────");
const stats = await fetch(`${GATEWAY}/vectors/pathway/stats`, { signal: AbortSignal.timeout(10000) });
if (stats.ok) {
const s: any = await stats.json();
console.log(`pathway_memory stats: total=${s.total_pathways} retired=${s.retired} reuse_rate=${s.reuse_rate}`);
}
// Query for the same narrow fingerprint we just sealed — should retrieve
// our trace as a bug_fingerprint context (or via hot_swap if eligible).
const fpQuery = await fetch(`${GATEWAY}/vectors/pathway/bug_fingerprints`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
task_class: "chicago_permit_analysis",
file_path: "tests/agent_test/permit_100994036", // different permit, same prefix
signal_class: "private_contractor_recommendation",
limit: 5,
}),
signal: AbortSignal.timeout(10000),
});
if (fpQuery.ok) {
const result: any = await fpQuery.json();
const fps = result.fingerprints ?? result;
console.log(`bug_fingerprints retrieval (sister permit, same prefix): ${JSON.stringify(fps).slice(0, 400)}`);
}
// Confirm the trace landed in state.json
const stateProbe = await Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateProbe.exists()) {
const state: any = JSON.parse(await stateProbe.text());
let found = false;
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis") { found = true; break; }
}
if (found) break;
}
console.log(`state.json contains chicago_permit_analysis trace: ${found}`);
}
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -1,259 +0,0 @@
#!/usr/bin/env bun
// Vectorize each raw-bucket corpus into a queryable matrix index.
// Reads from local raw/ dump (bun fetch from MinIO), shapes into
// {id, text} docs, POSTs to gateway /vectors/index, polls job to done.
//
// Targets one index per source with stable names so MATRIX_CORPORA_FOR_TASK
// can reference them. Idempotent: re-running rebuilds with a fresh _v2.
//
// Usage:
// bun run scripts/vectorize_raw_corpus.ts [source...]
// Default: runs all sources in order. Sources: chicago, entities, sec, llm_team_runs, llm_team_response
const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100";
const RAW_BUCKET = "raw";
const MC_ALIAS = "local";
const STAGE_DIR = "/tmp/vectorize_raw";
interface Doc { id: string; text: string }
interface SourceSpec {
name: string; // CLI flag
index_name: string; // /vectors/index target
s3_key: string; // path under raw/
source_label: string; // gateway "source" field
chunk_size?: number;
overlap?: number;
extractor: (raw: string) => Doc[];
}
// Spawn mc to copy from S3 → local stage so we can read it
async function fetchFromRaw(key: string): Promise<string> {
const fs = await import("node:fs/promises");
await fs.mkdir(STAGE_DIR, { recursive: true });
const local = `${STAGE_DIR}/${key.replace(/\//g, "_")}`;
const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/${key}`, local]);
await proc.exited;
if (proc.exitCode !== 0) throw new Error(`mc cp failed for ${key}`);
return local;
}
async function readJsonl(path: string): Promise<any[]> {
const text = await Bun.file(path).text();
return text.split("\n").filter(l => l.trim()).map(l => JSON.parse(l));
}
function truncate(s: string, n = 4000): string {
return s == null ? "" : (s.length > n ? s.slice(0, n) : s);
}
// Sanitize text before posting as JSON. Strips control chars and
// drops incomplete \uXXXX escape sequences which break Rust's
// serde JSON parser at the gateway. Llm_team response cache had
// rows with truncated \u escapes that 400'd the whole batch.
function sanitize(s: string): string {
if (!s) return "";
return s
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "") // strip control chars
.replace(/\\/g, "") // strip ALL backslashes — kills any malformed \uXXXX in source data
.replace(/[\uD800-\uDFFF]/g, ""); // strip UTF-16 surrogates (lone ones from emoji split by truncate)
}
// ─── EXTRACTORS — one per source ───
// Each shapes raw rows into {id, text} for the gateway's chunker.
function extractChicagoPermits(raw: string): Doc[] {
const arr = JSON.parse(raw);
return arr.map((p: any, i: number) => {
const text = [
`Permit ${p.permit_ ?? p.permit_number ?? `unknown_${i}`}`,
`Type: ${p.permit_type ?? "?"} Status: ${p.permit_status ?? "?"}`,
`Address: ${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.trim(),
`Issued: ${p.issue_date ?? "?"} Applied: ${p.application_start_date ?? "?"}`,
`Work: ${truncate(p.work_description ?? "", 800)}`,
`Estimated cost: ${p.reported_cost ?? p.estimated_cost ?? "?"}`,
`Contractors: ${p.contact_1 ?? ""} | ${p.contact_2 ?? ""}`,
`Owner: ${p.contact_3_name ?? ""} (${p.contact_3_type ?? ""})`,
`Subtypes: ${p.subtotal_paid ?? ""} community area=${p.community_area ?? ""} ward=${p.ward ?? ""}`,
].filter(Boolean).join("\n");
return { id: `permit_${p.permit_ ?? p.id ?? i}`, text };
});
}
function extractEntities(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const e = JSON.parse(line);
const name = e.normalized_name ?? e.name ?? e.display_name ?? `entity_${i}`;
const text = [
`Entity: ${name}`,
`Display: ${e.display_name ?? name}`,
e.ticker ? `Ticker: ${e.ticker}` : "",
e.cik ? `CIK: ${e.cik}` : "",
e.aliases ? `Aliases: ${(e.aliases ?? []).join(", ")}` : "",
e.last_seen ? `Last seen: ${e.last_seen}` : "",
e.notes ? `Notes: ${truncate(JSON.stringify(e.notes), 600)}` : "",
`Raw: ${truncate(JSON.stringify(e), 1500)}`,
].filter(Boolean).join("\n");
return { id: `entity_${name}_${i}`, text };
} catch {
return { id: `entity_${i}`, text: line.slice(0, 1000) };
}
});
}
function extractSecTickers(raw: string): Doc[] {
// SEC tickers JSON: {"_fetched_at": ..., "rows": {"0": {cik_str, ticker, title}, ...}}
const obj = JSON.parse(raw);
// The actual rows are under .rows; fall back to top-level if no wrapper.
const rows = obj.rows ?? obj;
return Object.values(rows)
.filter((r: any) => r && typeof r === "object" && r.ticker)
.map((row: any, i: number) => ({
id: `sec_${row.ticker ?? i}`,
text: `Ticker: ${row.ticker}\nCompany: ${row.title ?? "?"}\nCIK: ${row.cik_str ?? "?"}`,
}));
}
function extractLlmTeamRuns(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const r = JSON.parse(line);
const text = [
`Team run ${r.id ?? i}`,
`Mode: ${r.mode ?? "?"} Created: ${r.created_at ?? "?"}`,
r.prompt ? `Prompt: ${truncate(r.prompt, 1200)}` : "",
r.input ? `Input: ${truncate(typeof r.input === "string" ? r.input : JSON.stringify(r.input), 1200)}` : "",
r.output ? `Output: ${truncate(typeof r.output === "string" ? r.output : JSON.stringify(r.output), 2000)}` : "",
r.result ? `Result: ${truncate(typeof r.result === "string" ? r.result : JSON.stringify(r.result), 2000)}` : "",
r.metadata ? `Meta: ${truncate(JSON.stringify(r.metadata), 600)}` : "",
].filter(Boolean).join("\n");
return { id: `team_run_${r.id ?? i}`, text };
} catch {
return { id: `team_run_${i}`, text: line.slice(0, 2000) };
}
});
}
function extractLlmTeamResponseCache(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const r = JSON.parse(line);
const text = [
`Cached response ${r.cache_key ?? r.id ?? i}`,
`Created: ${r.created_at ?? "?"}`,
r.prompt ? `Prompt: ${sanitize(truncate(r.prompt, 1500))}` : "",
r.response ? `Response: ${sanitize(truncate(r.response, 2500))}` : "",
r.model ? `Model: ${r.model}` : "",
].filter(Boolean).join("\n");
return { id: `resp_${r.cache_key ?? r.id ?? i}`, text };
} catch {
return { id: `resp_${i}`, text: sanitize(line.slice(0, 2000)) };
}
});
}
const SOURCES: SourceSpec[] = [
{ name: "chicago", index_name: "chicago_permits_v1", s3_key: "chicago/permits_2026-04-25.json",
source_label: "chicago_permits", chunk_size: 600, overlap: 80, extractor: extractChicagoPermits },
{ name: "entities", index_name: "entity_brief_v1", s3_key: "entities/entities.jsonl",
source_label: "entity_brief", chunk_size: 500, overlap: 60, extractor: extractEntities },
{ name: "sec", index_name: "sec_tickers_v1", s3_key: "sec/company_tickers.json",
source_label: "sec_tickers", chunk_size: 200, overlap: 20, extractor: extractSecTickers },
{ name: "llm_team_runs", index_name: "llm_team_runs_v1", s3_key: "llm_team/team_runs.jsonl",
source_label: "llm_team_runs", chunk_size: 800, overlap: 100, extractor: extractLlmTeamRuns },
{ name: "llm_team_response", index_name: "llm_team_response_cache_v1", s3_key: "llm_team/response_cache.jsonl",
source_label: "llm_team_response_cache", chunk_size: 800, overlap: 100, extractor: extractLlmTeamResponseCache },
];
async function vectorizeOne(spec: SourceSpec): Promise<{ ok: boolean; chunks: number; job_id?: string; err?: string }> {
const t0 = Date.now();
console.log(`\n━━━ ${spec.name}${spec.index_name} ━━━`);
console.log(`fetching s3://${RAW_BUCKET}/${spec.s3_key}`);
let local: string;
try { local = await fetchFromRaw(spec.s3_key); }
catch (e: any) { return { ok: false, chunks: 0, err: `fetch: ${e.message}` }; }
console.log(`reading + extracting...`);
const raw = await Bun.file(local).text();
const docs = spec.extractor(raw);
if (docs.length === 0) return { ok: false, chunks: 0, err: "0 docs after extraction" };
console.log(` ${docs.length} docs (avg ${Math.round(docs.reduce((a, d) => a + d.text.length, 0) / docs.length)} chars)`);
console.log(`POST /vectors/index ${spec.index_name} ...`);
const resp = await fetch(`${GATEWAY}/vectors/index`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
index_name: spec.index_name,
source: spec.source_label,
documents: docs,
chunk_size: spec.chunk_size,
overlap: spec.overlap,
}),
signal: AbortSignal.timeout(300000),
});
if (!resp.ok) {
const body = await resp.text();
return { ok: false, chunks: 0, err: `HTTP ${resp.status}: ${body.slice(0, 300)}` };
}
const j: any = await resp.json();
const ms = Date.now() - t0;
console.log(` ✓ submitted: job=${j.job_id} chunks=${j.chunks} (extract+submit ${(ms/1000).toFixed(1)}s)`);
return { ok: true, chunks: j.chunks, job_id: j.job_id };
}
async function pollJob(jobId: string): Promise<{ status: string; processed: number; total: number }> {
const r = await fetch(`${GATEWAY}/vectors/jobs/${jobId}`, { signal: AbortSignal.timeout(5000) });
if (!r.ok) return { status: "unknown", processed: 0, total: 0 };
const j: any = await r.json();
return { status: j.status ?? "?", processed: j.processed ?? 0, total: j.total ?? 0 };
}
async function waitForJob(jobId: string, label: string, maxSec = 600): Promise<void> {
const t0 = Date.now();
let lastLog = 0;
while ((Date.now() - t0) / 1000 < maxSec) {
const s = await pollJob(jobId);
if (s.status === "complete" || s.status === "completed" || s.status === "done") {
console.log(`${label} job ${jobId.slice(0,8)} complete (${s.processed}/${s.total} in ${((Date.now()-t0)/1000).toFixed(0)}s)`);
return;
}
if (s.status === "failed" || s.status === "error") {
console.log(`${label} job ${jobId.slice(0,8)} failed at ${s.processed}/${s.total}`);
return;
}
if (Date.now() - lastLog > 15000) {
console.log(` · ${label} progress ${s.processed}/${s.total} (${s.status})`);
lastLog = Date.now();
}
await new Promise(r => setTimeout(r, 3000));
}
console.log(`${label} job ${jobId.slice(0,8)} still running after ${maxSec}s — leaving in background`);
}
async function main() {
const args = process.argv.slice(2);
const targets = args.length > 0 ? SOURCES.filter(s => args.includes(s.name)) : SOURCES;
console.log(`Vectorizing ${targets.length} source(s): ${targets.map(t => t.name).join(", ")}`);
const results: Array<{ name: string; result: any }> = [];
for (const spec of targets) {
try {
const r = await vectorizeOne(spec);
if (r.ok && r.job_id) await waitForJob(r.job_id, spec.name);
results.push({ name: spec.name, result: r });
} catch (e: any) {
console.error(`! ${spec.name}: ${e.message}`);
results.push({ name: spec.name, result: { ok: false, err: e.message } });
}
}
console.log(`\n━━━ SUMMARY ━━━`);
for (const { name, result } of results) {
console.log(` ${result.ok ? "✓" : "✗"} ${name.padEnd(20)} chunks=${result.chunks ?? 0} ${result.err ? `err=${result.err}` : ""}`);
}
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -1,353 +0,0 @@
#!/usr/bin/env bun
// Agent harness — runs local qwen3.5:latest as an autonomous agent
// against PRD.md. Exposes a tool-call loop. Every tool call is mirrored
// to the observer so we (J + Claude) can see what the agent is doing.
//
// Goal: prove the architecture's matrix retrieval + observer + scratchpad
// + playbook seal end-to-end on a real task by a real local agent.
//
// Iter 1: just run it. Watch where it gets stuck.
// Iter N: tune helpers based on what we observed.
import { appendFile, readFile } from "node:fs/promises";
import { existsSync, mkdirSync } from "node:fs";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const OBSERVER = "http://localhost:3800";
const PRD_PATH = "/home/profit/lakehouse/tests/agent_test/PRD.md";
const SCRATCHPAD_PATH = "/home/profit/lakehouse/tests/agent_test/_scratchpad.txt";
const TRACE_PATH = "/home/profit/lakehouse/tests/agent_test/_trace.jsonl";
const FINAL_PATH = "/home/profit/lakehouse/tests/agent_test/_final.md";
const PERMITS_RAW = "/tmp/vectorize_raw/chicago_permits_2026-04-25.json";
const AGENT_MODEL = process.env.AGENT_MODEL ?? "qwen3.5:latest";
const MAX_STEPS = Number(process.env.AGENT_MAX_STEPS ?? 15);
const SESSION_ID = `agent_${Date.now().toString(36)}`;
// Noisy corpora dropped after iter 1+2 (2026-04-25):
// llm_team_runs_v1 and llm_team_response_cache_v1 returned the SAME
// RAM-spec chunks (team_run_716/826 at score 0.59) regardless of query.
// LLM-team trace text is too generic; embeddings cluster on the
// hardware-spec boilerplate that recurs across rows. Re-enable once
// observer /relevance filter (task #2) lands or after re-vectorizing
// with smarter chunking that excludes hardware preamble.
const CORPORA = [
"chicago_permits_v1",
"entity_brief_v1",
"sec_tickers_v1",
"distilled_procedural_v20260423102847",
];
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[harness ${ts}] ${msg}`);
}
async function emitObserverEvent(payload: object) {
try {
await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ source: "agent_test", session_id: SESSION_ID, ...payload, ts: new Date().toISOString() }),
signal: AbortSignal.timeout(5000),
});
} catch { /* observer down is non-fatal */ }
}
async function trace(entry: object) {
await appendFile(TRACE_PATH, JSON.stringify({ ts: new Date().toISOString(), session_id: SESSION_ID, ...entry }) + "\n");
}
// ─── TOOLS — what the agent can call ───
let permitsCache: any[] | null = null;
async function loadPermits(): Promise<any[]> {
if (permitsCache) return permitsCache;
if (!existsSync(PERMITS_RAW)) {
// Fetch from raw bucket via mc
const proc = Bun.spawn(["mc", "cp", "-q", "local/raw/chicago/permits_2026-04-25.json", PERMITS_RAW]);
await proc.exited;
}
permitsCache = JSON.parse(await readFile(PERMITS_RAW, "utf8"));
return permitsCache!;
}
async function tool_list_permits(args: { min_cost?: number; permit_type?: string }): Promise<string> {
const all = await loadPermits();
let filtered = all.filter(p => p.contact_1_name || p.contact_2_name);
if (args.min_cost) filtered = filtered.filter(p => Number(p.reported_cost ?? 0) >= args.min_cost!);
if (args.permit_type) filtered = filtered.filter(p => (p.permit_type ?? "").toLowerCase().includes(args.permit_type!.toLowerCase()));
filtered.sort((a, b) => Number(b.reported_cost ?? 0) - Number(a.reported_cost ?? 0));
const out = filtered.slice(0, 5).map(p =>
`- permit_id=${p.permit_} type=${p.permit_type} cost=$${Number(p.reported_cost ?? 0).toLocaleString()} contractor=${p.contact_1_name ?? "?"}`
).join("\n");
return `Top ${Math.min(5, filtered.length)} of ${filtered.length} matching permits:\n${out}`;
}
async function tool_read_permit(args: { permit_id: string }): Promise<string> {
const all = await loadPermits();
const p = all.find(x => x.permit_ === args.permit_id);
if (!p) return `permit ${args.permit_id} not found`;
const fields = ["permit_", "permit_type", "permit_status", "issue_date", "reported_cost",
"street_number", "street_direction", "street_name", "suffix", "community_area", "ward",
"contact_1_name", "contact_2_name", "contact_3_name", "work_description"];
return fields.map(f => `${f}: ${p[f] ?? ""}`).join("\n");
}
async function tool_query_matrix(args: { query: string; top_k?: number }): Promise<string> {
const k = args.top_k ?? 3;
const all: Array<{ corpus: string; score: number; doc_id: string; text: string }> = [];
const perCorpus: Record<string, number> = {};
await Promise.all(CORPORA.map(async (corpus) => {
try {
const r = await fetch(`${GATEWAY}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: corpus, query: args.query, top_k: k }),
signal: AbortSignal.timeout(10000),
});
if (!r.ok) { perCorpus[corpus] = -1; return; }
const data: any = await r.json();
const results = data.results ?? [];
perCorpus[corpus] = results.length;
for (const h of results) {
all.push({ corpus, score: Number(h.score ?? 0), doc_id: String(h.doc_id ?? "?"), text: String(h.chunk_text ?? "").slice(0, 300) });
}
} catch { perCorpus[corpus] = -1; }
}));
all.sort((a, b) => b.score - a.score);
const top = all.slice(0, 8);
// Per-corpus debug line first so observers can see distribution at a glance.
const dist = Object.entries(perCorpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
if (top.length === 0) return `no matrix evidence for: ${args.query}\n(per-corpus: ${dist})`;
return `(per-corpus: ${dist})\n` + top.map((h, i) => `[${i + 1}] ${h.corpus} score=${h.score.toFixed(2)} doc=${h.doc_id}\n ${h.text.replace(/\s+/g, " ").trim()}`).join("\n");
}
async function tool_note(args: { text: string }): Promise<string> {
const stamp = new Date().toISOString().slice(11, 19);
await appendFile(SCRATCHPAD_PATH, `[${stamp}] ${args.text}\n`);
return `noted (${args.text.length} chars)`;
}
async function tool_read_scratchpad(): Promise<string> {
if (!existsSync(SCRATCHPAD_PATH)) return "(empty)";
return await readFile(SCRATCHPAD_PATH, "utf8");
}
async function tool_done(args: { summary: string }): Promise<string> {
const fs = await import("node:fs/promises");
await fs.writeFile(FINAL_PATH, args.summary);
return `done; final saved to ${FINAL_PATH} (${args.summary.length} chars)`;
}
const TOOLS: Record<string, (args: any) => Promise<string>> = {
list_permits: tool_list_permits,
read_permit: tool_read_permit,
query_matrix: tool_query_matrix,
note: tool_note,
read_scratchpad: tool_read_scratchpad,
done: tool_done,
};
const TOOL_SCHEMA = `Available tools (call by emitting JSON like: {"tool": "name", "args": {...}}):
- list_permits(min_cost?: number, permit_type?: string) top 5 by cost
- read_permit(permit_id: string) full permit fields
- query_matrix(query: string, top_k?: number) search KB
- note(text: string) append to scratchpad
- read_scratchpad() read your scratchpad
- done(summary: string) finish; pass final markdown analysis`;
// ─── AGENT LOOP ───
async function callAgent(messages: Array<{role: string; content: string}>): Promise<string> {
// think:false disables hidden reasoning so all generated tokens go to
// visible response. qwen3.5:latest defaults to thinking and silently
// burns the token budget otherwise.
const r = await fetch(`${SIDECAR}/generate`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
model: AGENT_MODEL,
prompt: messages.map(m => `${m.role.toUpperCase()}:\n${m.content}`).join("\n\n") + "\n\nASSISTANT:\n",
stream: false,
max_tokens: 1500,
think: false,
}),
signal: AbortSignal.timeout(180000),
});
if (!r.ok) throw new Error(`agent ${r.status}: ${(await r.text()).slice(0, 200)}`);
const j: any = await r.json();
return String(j.text ?? j.response ?? "").trim();
}
function extractToolCall(response: string): { tool: string; args: any } | null {
// Look for JSON block in the response
const fenced = response.match(/```(?:json)?\s*(\{[\s\S]+?\})\s*```/);
const candidate = fenced ? fenced[1] : (response.match(/\{[\s\S]*\}/)?.[0] ?? null);
if (!candidate) return null;
try {
const parsed = JSON.parse(candidate);
if (parsed.tool && typeof parsed.tool === "string") return { tool: parsed.tool, args: parsed.args ?? {} };
} catch { /* not JSON */ }
return null;
}
async function main() {
log(`session=${SESSION_ID} model=${AGENT_MODEL} max_steps=${MAX_STEPS}`);
// Reset workspace files for this session
for (const p of [SCRATCHPAD_PATH, TRACE_PATH, FINAL_PATH]) {
try { await Bun.write(p, ""); } catch { /* ignore */ }
}
const prd = await readFile(PRD_PATH, "utf8");
log(`loaded PRD (${prd.length} chars)`);
await emitObserverEvent({ event_kind: "agent_start", model: AGENT_MODEL });
// Pre-flight: pull prior accepted pathway traces for this task class
// and surface them as a "PROVEN APPROACHES" preamble. This closes the
// matrix loop — successful past runs now actively help the next agent.
let priorPlaybooks = "";
try {
const stateFile = Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateFile.exists()) {
const state: any = JSON.parse(await stateFile.text());
const matched: any[] = [];
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis" && t.final_verdict === "accepted" && !t.retired) {
matched.push(t);
}
}
}
matched.sort((a, b) => (b.created_at ?? "").localeCompare(a.created_at ?? ""));
if (matched.length > 0) {
const top = matched.slice(0, 2);
priorPlaybooks = "\n\n═══ 📖 PROVEN APPROACHES FROM PRIOR ACCEPTED RUNS ═══\n" +
top.map((t, i) =>
`[${i + 1}] pathway=${t.pathway_id?.slice(0, 12)} previously succeeded on ${t.file_path}\n` +
`Approach excerpt:\n${(t.reducer_summary ?? "").slice(0, 800)}`
).join("\n\n") +
"\n═══ end proven approaches ═══\n\nUse these as REFERENCE for what worked. Don't copy verbatim, but follow the same workflow shape (plan → list → read → matrix → analyze → done).\n";
log(`📖 found ${matched.length} prior accepted pathway(s) for chicago_permit_analysis — top ${top.length} prepended to agent context`);
} else {
log(`📖 no prior accepted pathways for chicago_permit_analysis (this is the first run)`);
}
}
} catch (e: any) {
log(`📖 pathway preamble skipped: ${e.message}`);
}
const systemMsg = `You are an autonomous agent. Read the PRD below and follow its instructions exactly.
${TOOL_SCHEMA}
To call a tool, respond with ONLY a JSON object: {"tool": "<name>", "args": {...}}
No markdown, no explanation around it. The harness will execute the tool and give you the result, then ask you what to do next.
When you are completely finished, call done(summary="<your final markdown>").`;
const messages: Array<{role: string; content: string}> = [
{ role: "system", content: systemMsg },
{ role: "user", content: `PRD:\n\n${prd}${priorPlaybooks}\n\nNow respond. Remember: PLAN first via note() before executing.` },
];
// Iter 3 surfaced: when the matrix returns real evidence, the agent
// gets analysis paralysis — keeps calling note() to refine instead of
// producing the final output. Guard: after MAX_CONSECUTIVE_NOTES
// note() calls in a row, harness injects a hard-stop user message
// telling the agent it MUST call done() next.
const MAX_CONSECUTIVE_NOTES = Number(process.env.AGENT_MAX_CONSECUTIVE_NOTES ?? 2);
let consecutiveNotes = 0;
let isDone = false;
for (let step = 1; step <= MAX_STEPS && !isDone; step++) {
log(`step ${step}/${MAX_STEPS} — calling agent...`);
const t0 = Date.now();
let response: string;
try {
response = await callAgent(messages);
} catch (e: any) {
log(` ✗ agent error: ${e.message}`);
await trace({ step, kind: "error", error: e.message });
await emitObserverEvent({ event_kind: "agent_error", step, error: e.message });
break;
}
const ms = Date.now() - t0;
log(` · agent responded ${response.length} chars in ${ms}ms`);
await trace({ step, kind: "agent_response", chars: response.length, latency_ms: ms, response: response.slice(0, 4000) });
const call = extractToolCall(response);
if (!call) {
log(` ⚠ no tool call extracted from response — agent may be confused`);
await trace({ step, kind: "no_tool_call", preview: response.slice(0, 500) });
await emitObserverEvent({ event_kind: "agent_no_tool", step, preview: response.slice(0, 200) });
// Push the agent: tell it to call a tool
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Your last response did not contain a valid tool call. Respond with ONLY a JSON object like {"tool": "note", "args": {"text": "..."}}. No prose around it.` });
continue;
}
log(` → tool: ${call.tool}(${JSON.stringify(call.args).slice(0, 200)})`);
if (!TOOLS[call.tool]) {
const err = `unknown tool: ${call.tool}`;
log(`${err}`);
await trace({ step, kind: "tool_unknown", tool: call.tool });
await emitObserverEvent({ event_kind: "tool_unknown", step, tool: call.tool });
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Tool "${call.tool}" does not exist. Available: ${Object.keys(TOOLS).join(", ")}. Try again.` });
continue;
}
const resStart = Date.now();
let result: string;
try {
result = await TOOLS[call.tool](call.args);
} catch (e: any) {
result = `TOOL ERROR: ${e.message}`;
}
const resMs = Date.now() - resStart;
log(`${result.slice(0, 200)}${result.length > 200 ? "..." : ""} (${resMs}ms)`);
await trace({ step, kind: "tool_call", tool: call.tool, args: call.args, result: result.slice(0, 4000), latency_ms: resMs });
await emitObserverEvent({ event_kind: "tool_call", step, tool: call.tool, result_chars: result.length });
if (call.tool === "done") {
isDone = true;
log(` ✓ DONE`);
await emitObserverEvent({ event_kind: "agent_done", step });
break;
}
// Track consecutive note() calls; force done() if too many in a row.
if (call.tool === "note") consecutiveNotes++;
else consecutiveNotes = 0;
messages.push({ role: "assistant", content: response });
if (consecutiveNotes >= MAX_CONSECUTIVE_NOTES) {
log(`${consecutiveNotes} consecutive note() calls — forcing done() next`);
await emitObserverEvent({ event_kind: "force_done_pressure", step, consecutive_notes: consecutiveNotes });
messages.push({ role: "user", content: `Tool result:\n${result}\n\nYou have called note() ${consecutiveNotes} times in a row without producing output. STOP NOTING. Call done(summary="<your final markdown>") NOW with whatever analysis you have. Do not call note() again. Respond with ONLY: {"tool": "done", "args": {"summary": "..."}}` });
consecutiveNotes = 0; // reset so we only push once per streak
} else {
messages.push({ role: "user", content: `Tool result:\n${result}\n\nWhat next?` });
}
}
if (!isDone) {
log(`✗ agent did not complete within ${MAX_STEPS} steps`);
await emitObserverEvent({ event_kind: "agent_max_steps", final_step: MAX_STEPS });
// Mem0: any partial trace this session inserted should be retired
// so future agents don't get a broken playbook in their preamble.
// We don't have a trace_uid for this session yet (insert happens
// on done); but if any prior trace has the same workflow shape as
// this session's tool sequence, retire it.
// For now, just log — actual retirement would happen if seal had run.
log(` ⚠ no playbook seal will be performed for failed run`);
}
log(`session ${SESSION_ID} ended. Trace: ${TRACE_PATH}`);
if (existsSync(FINAL_PATH)) log(`Final output: ${FINAL_PATH}`);
}
mkdirSync("/home/profit/lakehouse/tests/agent_test", { recursive: true });
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -1,214 +0,0 @@
#!/usr/bin/env bun
// Autonomous scrum loop — wraps scrum_master_pipeline.ts + scrum_applier.ts
// in a goal-driven retry loop. Observer is POSTed an iteration summary at
// every boundary so it can build meta-commentary outside the loop's epistemic
// scope.
//
// Usage:
// LOOP_TARGETS="crates/a/src/x.rs,crates/b/src/y.rs" \
// LOOP_MAX_ITERS=5 \
// LOOP_PUSH=1 \
// bun run tests/real-world/autonomous_loop.ts
//
// Stop conditions: max_iters reached OR 2 consecutive iters with 0 commits.
import { spawn } from "node:child_process";
import { appendFile, readFile } from "node:fs/promises";
import { existsSync } from "node:fs";
const REPO = "/home/profit/lakehouse";
const OBSERVER = process.env.LOOP_OBSERVER ?? "http://localhost:3800";
const BRANCH = process.env.LOOP_BRANCH ?? "scrum/auto-apply-19814";
const MAX_ITERS = Number(process.env.LOOP_MAX_ITERS ?? 3);
const PUSH = process.env.LOOP_PUSH === "1";
const MIN_CONF = process.env.LOOP_MIN_CONF ?? "85";
// Optional override — when unset, let scrum_applier.ts use ITS default
// (currently x-ai/grok-4.1-fast on openrouter). The prior hardcoded
// qwen3-coder:480b default was clobbering the applier's own default
// and forcing every iter to hit the throttled ollama_cloud account.
const APPLIER_MODEL = process.env.LOOP_APPLIER_MODEL;
const APPLIER_PROVIDER = process.env.LOOP_APPLIER_PROVIDER;
const TARGETS = (process.env.LOOP_TARGETS ?? "crates/queryd/src/service.rs,crates/gateway/src/main.rs,crates/gateway/src/v1/mod.rs")
.split(",").map(s => s.trim()).filter(Boolean);
const FORENSIC = process.env.LH_SCRUM_FORENSIC ?? `${REPO}/docs/SCRUM_FORENSIC_PROMPT.md`;
const PROPOSAL = process.env.LH_SCRUM_PROPOSAL ?? `${REPO}/docs/SCRUM_FIX_WAVE.md`;
const LOOP_ID = `loop_${Date.now().toString(36)}`;
const JOURNAL = `${REPO}/data/_kb/autonomous_loops.jsonl`;
interface IterResult {
iter: number;
scrum_reviews_added: number;
applier_outcomes: Record<string, number>;
commits_landed: number;
commit_shas: string[];
build_status: "green" | "red" | "unknown";
duration_ms: number;
}
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[loop ${LOOP_ID} ${ts}] ${msg}`);
}
function runCmd(cmd: string, args: string[], env: Record<string, string> = {}): Promise<{ code: number; stdout: string; stderr: string }> {
return new Promise((resolve) => {
const child = spawn(cmd, args, { cwd: REPO, env: { ...process.env, ...env } });
let stdout = "", stderr = "";
child.stdout.on("data", (d) => { stdout += d; process.stdout.write(d); });
child.stderr.on("data", (d) => { stderr += d; process.stderr.write(d); });
child.on("close", (code) => resolve({ code: code ?? -1, stdout, stderr }));
});
}
async function countLines(path: string): Promise<number> {
if (!existsSync(path)) return 0;
const text = await readFile(path, "utf8");
return text.split("\n").filter(Boolean).length;
}
async function gitHeadSha(): Promise<string> {
const r = await runCmd("git", ["rev-parse", "HEAD"]);
return r.stdout.trim();
}
async function commitsSince(baseSha: string): Promise<string[]> {
const r = await runCmd("git", ["log", "--oneline", `${baseSha}..HEAD`]);
return r.stdout.trim().split("\n").filter(Boolean);
}
async function cargoCheckGreen(): Promise<boolean> {
log("cargo check --workspace …");
const r = await runCmd("cargo", ["check", "--workspace", "--quiet"]);
return r.code === 0;
}
async function postObserver(payload: object) {
try {
const r = await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(5000),
});
if (!r.ok) log(`observer POST returned ${r.status}`);
} catch (e: any) {
log(`observer POST failed: ${e.message}`);
}
}
async function runIter(iter: number, baseSha: string): Promise<IterResult> {
const t0 = Date.now();
log(`══ iter ${iter} start (base ${baseSha.slice(0, 8)}) targets=${TARGETS.length}`);
const reviewsBefore = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyBefore = await countLines(`${REPO}/data/_kb/auto_apply.jsonl`);
log(`scrum_master_pipeline.ts → ${TARGETS.length} files`);
await runCmd("bun", ["run", "tests/real-world/scrum_master_pipeline.ts"], {
LH_SCRUM_FILES: TARGETS.join(","),
LH_SCRUM_FORENSIC: FORENSIC,
LH_SCRUM_PROPOSAL: PROPOSAL,
});
log(`scrum_applier.ts COMMIT=1 MIN_CONF=${MIN_CONF} files=${TARGETS.length}`);
// Only forward model/provider when explicitly overridden — otherwise
// let scrum_applier.ts use its own defaults (Grok 4.1 fast on openrouter).
const applierEnv: Record<string, string> = {
LH_APPLIER_COMMIT: "1",
LH_APPLIER_MIN_CONF: MIN_CONF,
LH_APPLIER_MAX_FILES: String(TARGETS.length),
LH_APPLIER_BRANCH: BRANCH,
// Constrain applier to THIS iter's targets so it patches what we
// just reviewed instead of the highest-confidence file from history.
LH_APPLIER_FILES: TARGETS.join(","),
};
if (APPLIER_MODEL) applierEnv.LH_APPLIER_MODEL = APPLIER_MODEL;
if (APPLIER_PROVIDER) applierEnv.LH_APPLIER_PROVIDER = APPLIER_PROVIDER;
await runCmd("bun", ["run", "tests/real-world/scrum_applier.ts"], applierEnv);
const reviewsAfter = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyAfterText = existsSync(`${REPO}/data/_kb/auto_apply.jsonl`)
? await readFile(`${REPO}/data/_kb/auto_apply.jsonl`, "utf8")
: "";
const applyRows = applyAfterText.split("\n").filter(Boolean).slice(applyBefore);
const outcomes: Record<string, number> = {};
for (const line of applyRows) {
try {
const o = JSON.parse(line);
outcomes[o.action ?? "?"] = (outcomes[o.action ?? "?"] ?? 0) + 1;
} catch { /* skip malformed */ }
}
const commitShas = await commitsSince(baseSha);
const buildStatus = commitShas.length > 0 ? (await cargoCheckGreen() ? "green" : "red") : "unknown";
const result: IterResult = {
iter,
scrum_reviews_added: reviewsAfter - reviewsBefore,
applier_outcomes: outcomes,
commits_landed: commitShas.length,
commit_shas: commitShas.map(s => s.split(" ")[0]),
build_status: buildStatus,
duration_ms: Date.now() - t0,
};
log(`iter ${iter} done — reviews+${result.scrum_reviews_added} commits=${result.commits_landed} build=${buildStatus} (${(result.duration_ms / 1000).toFixed(1)}s)`);
await postObserver({
source: "autonomous_loop",
loop_id: LOOP_ID,
event_kind: "iteration_complete",
iter,
targets: TARGETS,
success: buildStatus !== "red",
scrum_reviews_added: result.scrum_reviews_added,
applier_outcomes: result.applier_outcomes,
commits_landed: result.commits_landed,
commit_shas: result.commit_shas,
build_status: buildStatus,
duration_ms: result.duration_ms,
ts: new Date().toISOString(),
});
await appendFile(JOURNAL, JSON.stringify({ loop_id: LOOP_ID, ...result, ts: new Date().toISOString() }) + "\n");
return result;
}
async function main() {
log(`autonomous loop starting · branch=${BRANCH} max_iters=${MAX_ITERS} push=${PUSH}`);
log(`targets: ${TARGETS.join(", ")}`);
const branchR = await runCmd("git", ["branch", "--show-current"]);
if (branchR.stdout.trim() !== BRANCH) {
log(`ERROR: on branch ${branchR.stdout.trim()}, expected ${BRANCH}. Refusing to run.`);
process.exit(1);
}
let consecutiveZero = 0;
for (let iter = 1; iter <= MAX_ITERS; iter++) {
const baseSha = await gitHeadSha();
const result = await runIter(iter, baseSha);
if (PUSH && result.commits_landed > 0) {
log(`git push origin ${BRANCH}`);
const pushR = await runCmd("git", ["push", "origin", BRANCH]);
if (pushR.code !== 0) log(`push failed (continuing): ${pushR.stderr.slice(0, 200)}`);
}
consecutiveZero = result.commits_landed === 0 ? consecutiveZero + 1 : 0;
if (consecutiveZero >= 2) {
log(`STOP: 2 consecutive iters with 0 commits. Loop converged or stuck.`);
break;
}
}
log(`loop ${LOOP_ID} complete. Journal: ${JOURNAL}`);
}
main().catch((e) => {
log(`FATAL: ${e.message}`);
process.exit(1);
});

View File

@ -35,12 +35,6 @@ const AUDIT_LOG = `${REPO}/data/_kb/auto_apply.jsonl`;
const MIN_CONF = Number(process.env.LH_APPLIER_MIN_CONF ?? 90);
const MAX_FILES = Number(process.env.LH_APPLIER_MAX_FILES ?? 5);
const COMMIT = process.env.LH_APPLIER_COMMIT === "1";
// LH_APPLIER_FILES — comma-separated repo-relative paths. When set,
// constrains eligible reviews to ONLY those files. Used by the autonomous
// loop so the applier patches what scrum just reviewed in this iter,
// instead of pulling the highest-confidence file from global review history.
const TARGET_FILES = (process.env.LH_APPLIER_FILES ?? "")
.split(",").map(s => s.trim()).filter(Boolean);
// Default patch-emitter model — qwen3-coder:480b is the coding specialist
// in the scrum ladder (rung 2). Swapped in from kimi-k2:1t after 2026-04-24
// data showed kimi-k2:1t produces architectural patches that cascade across
@ -48,13 +42,7 @@ const TARGET_FILES = (process.env.LH_APPLIER_FILES ?? "")
// for targeted code changes and tends to stay within the mechanical-patch
// constraint the prompt asks for. LLM Team's /api/run?mode=patch would be
// the ideal choice but that mode isn't registered in llm_team_ui.py yet.
// Default patch emitter swapped to OpenRouter Grok 4.1 fast (2026-04-25)
// after observing the prior default (ollama_cloud::qwen3-coder:480b) sit
// at 429 throttle and never produce patches. Grok 4.1 fast: $0.20/$0.50
// per M, 2M ctx, proven to emit precise structured patches in observer
// hand-review tests. Override with LH_APPLIER_MODEL + LH_APPLIER_PROVIDER.
const MODEL = process.env.LH_APPLIER_MODEL ?? "x-ai/grok-4.1-fast";
const PROVIDER = (process.env.LH_APPLIER_PROVIDER ?? "openrouter") as "ollama_cloud" | "openrouter" | "ollama";
const MODEL = process.env.LH_APPLIER_MODEL ?? "qwen3-coder:480b";
const BRANCH = process.env.LH_APPLIER_BRANCH ?? `scrum/auto-apply-${Date.now().toString(36)}`;
// Deny-list — anything whose path starts with one of these is skipped
@ -212,7 +200,7 @@ ${source.slice(0, 14000)}
Emit ONLY the JSON object.`;
const r = await chat({ provider: PROVIDER, model: MODEL, prompt, max_tokens: 2500 });
const r = await chat({ provider: "ollama_cloud", model: MODEL, prompt, max_tokens: 2500 });
if (r.error || !r.content) return [];
// Strip markdown fences if model wrapped the JSON.
@ -346,14 +334,10 @@ async function main() {
log(`loaded ${reviews.size} latest reviews`);
const eligible = [...reviews.values()].filter(r =>
passesConfidenceGate(r) && passesDenyList(r.file) &&
(TARGET_FILES.length === 0 || TARGET_FILES.includes(r.file))
passesConfidenceGate(r) && passesDenyList(r.file)
).sort((a, b) => (b.confidence_avg ?? 0) - (a.confidence_avg ?? 0));
if (TARGET_FILES.length > 0) {
log(`LH_APPLIER_FILES set — constrained to ${TARGET_FILES.length} target file(s): ${TARGET_FILES.join(", ")}`);
}
log(`${eligible.length} pass confidence gate + deny-list${TARGET_FILES.length > 0 ? " + target filter" : ""}`);
log(`${eligible.length} pass confidence gate + deny-list`);
log(`taking top ${Math.min(MAX_FILES, eligible.length)} by confidence`);
// Establish pre-run warning baseline so post-patch cargo check can

File diff suppressed because it is too large Load Diff