Compare commits

..

13 Commits

Author SHA1 Message Date
root
779158a09b scripts: chicago analyzer field-name fixes + vectorize sanitizer hardening
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
Two small fixes surfaced during smoke testing:

analyze_chicago_contracts.ts: permit field is contact_1_name not
contact_1; reported_cost is integer-string. Fixed filter (was rejecting
all 2853 permits) and contractor extraction (was empty).

vectorize_raw_corpus.ts: sanitize() expanded to strip control chars +
ALL backslashes (kills incomplete \uXXXX escapes) + UTF-16 surrogates
(unpaired surrogates from emoji split by truncate boundary). Llm_team
response cache had docs with all three pollution shapes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:34:45 -05:00
root
6ac7f61819 pathway_memory: Mem0 versioning + deletion (upsert/revise/retire/history)
Per J 2026-04-25: pathway_memory was append-only — every agent run added
a new trace, bad/failed runs polluted the matrix forever, no notion of
"this is the canonical evolved playbook." Ported playbook_memory's
Phase 25/27 patterns into pathway_memory so the agent loop's matrix
converges on best-known approaches per task class instead of bloating.

Fields added to PathwayTrace (all #[serde(default)] for back-compat):
- trace_uid: stable UUID per individual trace within a bucket
- version: u32 default 1
- parent_trace_uid, superseded_at, superseded_by_trace_uid
- retirement_reason (paired with existing retired:bool)

Methods added to PathwayMemory:
- upsert(trace) → PathwayUpsertOutcome {Added|Updated|Noop}
  Workflow-fingerprint dedup: ladder_attempts + final_verdict hash.
  Identical workflow → bumps existing replay_count instead of duplicating.
- revise(parent_uid, new_trace) → PathwayReviseOutcome
  Chains versions; rejects retired or already-superseded parents.
- retire(trace_uid, reason) → bool
  Marks specific trace retired with reason. Idempotent.
- history(trace_uid) → Vec<PathwayTrace>
  Walks parent_trace_uid back to root, then superseded_by forward to tip.
  Cycle-safe via visited set.

Retrieval gates updated:
- query_hot_swap skips superseded_at.is_some()
- bug_fingerprints_for skips both retired AND superseded

HTTP endpoints in service.rs:
- POST /vectors/pathway/upsert
- POST /vectors/pathway/retire
- POST /vectors/pathway/revise
- GET  /vectors/pathway/history/{trace_uid}

scripts/seal_agent_playbook.ts switched insert→upsert + accepts SESSION_DIR
arg so it can seal any archived session, not just iter4.

Verified live (4/4 ops):
- UPSERT first run: Added trace_uid 542ae53f
- UPSERT identical: Updated, replay_count bumped 0→1 (no duplicate)
- REVISE 542ae53f→87a70a61: parent stamped superseded_at, v2 created
- HISTORY of v2: chain_len=2, v1 superseded, v2 tip
- RETIRE iter-6 broken trace: retired=true, retirement_reason preserved
- pathway_memory.stats: total=79, retired=1, reuse_rate=0.0127

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:31:44 -05:00
root
ed83754f20 raw-corpus dump + vectorization + chicago contract inference pipeline
Three new pieces, executed in order:

scripts/dump_raw_corpus.sh
- One-shot bash that creates MinIO bucket `raw` and uploads all
  testing corpora as a persistent immutable test set. 365 MB total
  across 5 prefixes (chicago, entities, sec, staffing, llm_team)
  + MANIFEST.json. Sources: workers_500k.parquet (309 MB),
  resumes.parquet, entities.jsonl, sec_company_tickers.json,
  Chicago permits last 30d (2,853 records, 5.4 MB), 9 LLM Team
  Postgres tables dumped via row_to_json.

scripts/vectorize_raw_corpus.ts
- Bun script that fetches each raw-bucket source via mc, runs a
  source-specific extractor into {id, text} docs, posts to
  /vectors/index, polls job to completion. Verified results:
    chicago_permits_v1: 3,420 chunks
    entity_brief_v1:    634 chunks
    sec_tickers_v1:    10,341 chunks (after extractor fix for
                        wrapped {rows: {...}} JSON shape)
    llm_team_runs_v1:  in flight, 19K+ chunks
    llm_team_response_cache_v1: queued

scripts/analyze_chicago_contracts.ts
- Real inference pipeline that picks N high-cost permits with
  named contractors from the raw bucket, queries all 6 contract-
  analysis corpora in parallel via /vectors/search, builds a
  MATRIX CONTEXT preamble, calls Grok 4.1 fast for structured
  staffing analysis, hand-reviews each via observer /review,
  appends to data/_kb/contract_analyses.jsonl.

tests/real-world/scrum_master_pipeline.ts
- MATRIX_CORPORA_FOR_TASK extended with two new task classes:
  contract_analysis (chicago + entity_brief + sec + llm_team_runs
    + llm_team_response_cache + distilled_procedural)
  staffing_inference (workers_500k_v8 + entity_brief + chicago
    + llm_team_runs + distilled_procedural)
  scrum_review unchanged.

This is the first time the matrix architecture operates on real
ingested data instead of code-review smoke tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 18:44:27 -05:00
root
a496ced848 scrum: unified matrix retriever — pull from ALL relevant KB corpora, not just pathway memory
Per J 2026-04-25 architectural correction: matrix index is the vector
indexing layer for the WHOLE knowledge base (distilled facts, procedures,
config hints, team runs, playbooks, pathway successes), not a single
narrow store. Built fetchMatrixContext(query, taskClass, filePath) that:

- Queries multiple persistent vector indexes in parallel via /vectors/search
- Collects hits per corpus + score + doc_id + 400-char excerpt
- Pulls pathway successes via existing helper, mapped to MatrixHit shape
- Sorts by score across corpora, returns top-N (default 8)
- Reports per-corpus hit counts + errors for transparency

Per-task-class corpus list (MATRIX_CORPORA_FOR_TASK):
  scrum_review → distilled_factual, distilled_procedural,
                 distilled_config_hint, kb_team_runs_v1
  (staffing data deliberately excluded — not relevant to code review)

Probed live: distilled_config_hint top hit = 0.52, distilled_procedural
top = 0.49, kb_team_runs top = 0.59. Real signal across corpora.

Replaces the narrow proven-approaches preamble with a unified
MATRIX-INDEXED CONTEXT preamble tagged with source_corpus per chunk
so the model knows what kind of context it's seeing.

LH_SCRUM_MATRIX_RETRIEVE=0 still disables for A/B testing.

Future: promote to a Rust /v1/matrix endpoint once corpora list and
ranking logic stabilize. For now TS lets us iterate fast against the
live matrix without gateway restarts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 18:29:08 -05:00
root
d187bcd8ac scrum: stop cascading models on quality issues — single-model retry with enrichment
Architectural correction (J 2026-04-25):

The 9-rung ladder was treating cascade as the strategy. That's wrong.
ONE model handles the work, with same-model retries using enriched
context. Cycle to a different model ONLY on PROVIDER errors (network
/ auth / 5xx) — never on quality issues, because quality issues mean
the context needs more enrichment, not a different model.

Changes:
- LADDER shrunk from 11 entries to 3 (Grok 4.1 fast primary, DeepSeek
  V4 flash + Qwen3-235B as provider-error fallbacks). Removed Kimi
  K2.6, Gemini 2.5 flash, all Ollama Cloud rungs, OR free-tier rungs,
  local qwen3.5 — none were doing the work, all wasted attempts. They
  remain available as routable tools for the future mode router.
- Loop restructured: separate `modelIdx` from attempt counter.
  Provider error → modelIdx++ (advance fallback). Observer reject /
  cycle / thin response → retry SAME model with rejection notes
  feeding into the `learning` preamble; advance fallback only after
  MAX_QUALITY_RETRIES (default 2) exhausted on the current model.
- LH_SCRUM_MAX_QUALITY_RETRIES env to tune the per-model retry cap.

What this preserves:
- Tree-split (treeSplitFile) is still the ONE legitimate model-switch
  trigger for context-overflow, but even it just re-runs the same
  model against smaller chunks.
- Pathway memory preamble still fires.
- Hot-swap reorder still applies — when a recommended model maps to
  the new shorter ladder.

Future direction (J 2026-04-25 note): the LLM Team multi-model modes
in /root/llm_team_ui.py are a REFERENCE PATTERN for a mode router we
will build INSIDE this gateway. Mimic the patterns, don't modify the
LLM Team UI itself. The mode router will pick the right approach for
each task class via the matrix index, not cascade through models.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 18:08:31 -05:00
root
6432465e2c autonomous_loop: stop clobbering applier model/provider defaults
Found by running: the loop was setting LH_APPLIER_MODEL=qwen3-coder:480b
explicitly via env, which clobbered the applier's NEW default of
x-ai/grok-4.1-fast on openrouter. Result: applier kept hitting the
throttled ollama_cloud account and producing zero patches every iter.

Now LOOP_APPLIER_MODEL and LOOP_APPLIER_PROVIDER are optional overrides;
when unset, scrum_applier.ts uses its own defaults.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:54:54 -05:00
root
4ac56564c0 scrum + applier + observer: switch to paid OpenRouter ladder, add Kimi K2.6 + Gemini 2.5
Ollama Cloud was throttled across all 6 cloud rungs in iters 1-9, which
forced the loop into 0-review iterations even though the architecture
was sound. Swapping to paid OpenRouter unblocks the test path.

Ladder changes (top-of-ladder paid models, all under $0.85/M either side):
- moonshotai/kimi-k2.6     ($0.74/$4.66, 256K) — capped at 25/hr
- x-ai/grok-4.1-fast       ($0.20/$0.50, 2M)   — primary general
- google/gemini-2.5-flash  ($0.30/$2.50, 1M)   — Google reasoning
- deepseek/deepseek-v4-flash ($0.14/$0.28, 1M) — cheap workhorse
- qwen/qwen3-235b-a22b-2507  ($0.07/$0.10, 262K) — cheapest big
Existing rungs (Ollama Cloud + free OR + local qwen3.5) kept as fallback.

Per-model rate limiter (MODEL_RATE_LIMITS in scrum_master_pipeline.ts):
- Persists call timestamps to data/_kb/rate_limit_calls.jsonl so caps
  survive process restarts (autonomous loop spawns a fresh subprocess
  per iteration; without persistence each iter would reset)
- O(1) writes, prune-on-read for the rolling 1h window
- Capped models log "SKIP (rate-limited: cap N/hr reached)" and the
  ladder cycles to the next rung
- J directive 2026-04-25: 25/hr on Kimi K2.6 to bound output cost

Observer hand-review cloud tier swapped from ollama_cloud/qwen3-coder:480b
to openrouter/x-ai/grok-4.1-fast — proven to emit precise semantic
verdicts (named "AccessControl::can_access() doesn't exist" specifically
in 2026-04-25 tests instead of the heuristic fallback).

Applier patch emitter swapped from ollama_cloud/qwen3-coder:480b to
openrouter/x-ai/grok-4.1-fast (default; LH_APPLIER_MODEL +
LH_APPLIER_PROVIDER override). This was the third LLM call we missed —
without it, observer accepts a review but applier never produces patches
because its emitter was still hitting the throttled account.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:49:02 -05:00
root
e79e51ed70 tests: autonomous_loop.ts — goal-driven scrum + applier retry harness
Wraps tests/real-world/scrum_master_pipeline.ts and scrum_applier.ts in
a single autonomous loop that runs scrum → applier --commit → optional
git push, observes per-iteration outcomes via observer /event, journals
to data/_kb/autonomous_loops.jsonl. Stops when 2 consecutive iters land
zero commits OR LOOP_MAX_ITERS reached.

Env knobs:
  LOOP_TARGETS — comma-sep paths, default 3 high-traffic Lakehouse files
  LOOP_MAX_ITERS — default 3
  LOOP_PUSH=1 — push branch after each commit-landing iter
  LOOP_BRANCH — default scrum/auto-apply-19814 (refuses to run elsewhere)
  LOOP_MIN_CONF — applier min confidence (default 85)
  LOOP_APPLIER_MODEL — default qwen3-coder:480b

Causality preserved: targets pass through to LH_APPLIER_FILES so applier
patches what scrum just reviewed (vs picking from global review history).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:32:15 -05:00
root
3f166a5558 scrum + observer: hand-review wire — judgment moved out of the inner loop
Pre-2026-04-25 the scrum_master applied a hardcoded grounding-rate gate
inline. That baked policy into the wrong layer — semantic judgment about
whether a review is grounded belongs in the observer (which has Langfuse
traces, sees every response across the system, and can call cloud LLMs
for real evaluation). Scrum should report DATA, observer DECIDES.

What landed:
- scrum_master_pipeline.ts: removed the inline grounding-pct threshold;
  every accepted candidate now POSTs to observer's /review endpoint with
  {response, source_content, grounding_stats, model, attempt}. Observer
  returns {verdict: accept|reject|cycle, confidence, notes}. On observer
  failure, scrum falls open to accept (observer is policy, not blocker).
- mcp-server/observer.ts: new POST /review endpoint with two-tier
  evaluator. Tier 1: cloud LLM (qwen3-coder:480b at temp=0) hand-reviews
  with full context — response + source excerpt + grounding stats — and
  emits structured verdict JSON. Tier 2: deterministic heuristic over
  grounding pct + total quotes when cloud throttles, marked source:
  "heuristic" so consumers can tune it later by comparing against cloud.
- Every verdict persists to data/_kb/observer_reviews.jsonl with full
  input snapshot so cloud vs heuristic can be A/B compared once cloud
  quota refreshes.

Verified end-to-end: smoke loop iter 1 — observer returned `cycle` on
21% grounding (cycled to next rung), `reject` on 17% (gave up). Iter 2
— `reject` on 12% and 14%. Both UNRESOLVED with honest signal instead
of polluting pathway memory with hallucinated patterns.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:32:04 -05:00
root
c90a509f49 applier: LH_APPLIER_FILES env to constrain to current-iter targets
Without this, the applier loaded the latest 34 reviews and patched the
highest-confidence file from history — which is meaningless when called
from the autonomous loop where the intent is "review file X this iter,
patch file X this iter." Now the loop passes its targets through and the
applier filters eligible reviews accordingly.

Causality is restored: scrum reviews file X → applier patches file X.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:31:49 -05:00
root
9ecc5848fa scrum: blind-response guard + anchor-grounding post-verifier
Two signal-quality fixes for the scrum loop:

1. isBlindResponse() — detects models that emit structurally-valid
   review JSON containing "no source code visible / cannot verify"
   even when source WAS supplied. Rejects so the ladder cycles to
   the next rung instead of accepting the blind hallucination.

2. verifyAnchorGrounding() + appendGroundingFooter() — post-process
   verifier that extracts every backtick-quoted snippet from the
   review and checks it against the original source content.
   Appends a grounding footer reporting grounded vs ungrounded
   counts so humans can audit hallucination rate at a glance.

Born from the iter where llm_team_ui.py review came back with 6/10
findings hallucinated (invented render_template_string calls,
fabricated logger.exception sites, made-up SHA-256 hashing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:07:30 -05:00
root
b843a23433 mcp: contractor entity-brief drill-down + mobile UX pass
Adds /contractor page route plus /intelligence/contractor_profile
endpoint that fans out across OSHA, ticker, history, parent_link,
federal contracts, debarment, NLRB, ILSOS, news, diversity certs,
BLS macro — single per-contractor portfolio view across every
wired source.

search.html: mobile responsive layout, fixed bottom dock with
horizontal scroll-snap, legacy bridge row stacking, viewport
overflow guards.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:07:23 -05:00
root
a6c83b03e5 chore: sync Cargo.lock — toml dep for phase-42 rule loader
Pairs retroactively with de8fb10 (truth/ TOML rule loader).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 17:07:16 -05:00
14 changed files with 4058 additions and 139 deletions

1
Cargo.lock generated
View File

@ -8737,6 +8737,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"toml",
"tracing", "tracing",
] ]

View File

@ -217,8 +217,39 @@ pub struct PathwayTrace {
/// such that a retired pathway would work again, a new PathwayTrace /// such that a retired pathway would work again, a new PathwayTrace
/// with a fresh id will be inserted — retirement is per-id.) /// with a fresh id will be inserted — retirement is per-id.)
pub retired: bool, pub retired: bool,
// ─── Mem0 versioning + deletion (J 2026-04-25 directive, mirrors
// playbook_memory's Phase 25/27 patterns) ───
/// UUID for THIS specific trace. pathway_id is the bucket key
/// (shared by traces of the same task/file_prefix/signal); trace_uid
/// addresses an individual trace within that bucket so retire/revise
/// can target it precisely. Empty on legacy traces; populated by
/// upsert/insert callers (or filled with a generated UUID on insert).
#[serde(default)]
pub trace_uid: String,
/// Mem0-style version chain. v1 for original traces; bumped on
/// `revise()`. Legacy traces deserialize as version=1 via default.
#[serde(default = "default_version")]
pub version: u32,
/// trace_uid of the trace this one supersedes (None = root version).
#[serde(default)]
pub parent_trace_uid: Option<String>,
/// Set when a newer version supersedes this trace. Excluded from
/// retrieval (hot-swap, bug_fingerprints_for) once set.
#[serde(default)]
pub superseded_at: Option<String>,
/// trace_uid of the new version. Pairs with superseded_at.
#[serde(default)]
pub superseded_by_trace_uid: Option<String>,
/// Human-readable reason recorded with retire(). Pairs with
/// `retired: true`. Empty on probation-driven retirements (those
/// just set retired=true without a textual reason).
#[serde(default)]
pub retirement_reason: Option<String>,
} }
fn default_version() -> u32 { 1 }
impl PathwayTrace { impl PathwayTrace {
/// Compute the narrow fingerprint id from task_class + file_prefix /// Compute the narrow fingerprint id from task_class + file_prefix
/// + signal_class. `file_prefix` is the first path segment /// + signal_class. `file_prefix` is the first path segment
@ -349,6 +380,48 @@ pub struct HotSwapCandidate {
pub recommended_model: String, pub recommended_model: String,
} }
/// Mem0-style outcome of an upsert. Mirrors playbook_memory::UpsertOutcome
/// but adapts the UPDATE semantic to PathwayTrace's bucket model: there
/// is no notion of merging endorsed_names — each trace is an immutable
/// run record. UPDATE here means "we found a non-retired non-superseded
/// trace with the same workflow shape; bumped its replay_count instead
/// of appending a duplicate." NOOP is reserved for the case where the
/// caller asked for an upsert that would change nothing observable.
#[derive(Debug, Serialize)]
pub enum PathwayUpsertOutcome {
Added { pathway_id: String, trace_uid: String },
Updated { pathway_id: String, trace_uid: String, replay_count: u32 },
Noop { pathway_id: String, trace_uid: String },
}
/// Mem0-style outcome of revise — chains versions across traces.
#[derive(Debug, Serialize)]
pub struct PathwayReviseOutcome {
pub parent_trace_uid: String,
pub parent_version: u32,
pub new_trace_uid: String,
pub new_version: u32,
pub superseded_at: String,
}
/// Compute a stable fingerprint for upsert dedup. Captures the
/// workflow shape: the sequence of (rung, model) pairs from
/// ladder_attempts, plus the final_verdict. Two traces with the same
/// fingerprint represent the same proven approach on the same task —
/// don't store duplicates.
fn workflow_fingerprint(trace: &PathwayTrace) -> String {
let mut h = Sha256::new();
h.update(trace.final_verdict.as_bytes());
h.update(b"|");
for a in &trace.ladder_attempts {
h.update(a.model.as_bytes());
h.update(b":");
h.update(a.rung.to_string().as_bytes());
h.update(b";");
}
format!("{:x}", h.finalize())
}
impl PathwayMemory { impl PathwayMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self { pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { Self {
@ -385,6 +458,10 @@ impl PathwayMemory {
if trace.pathway_vec.is_empty() { if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace); trace.pathway_vec = build_pathway_vec(&trace);
} }
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let mut s = self.state.write().await; let mut s = self.state.write().await;
s.pathways s.pathways
.entry(trace.pathway_id.clone()) .entry(trace.pathway_id.clone())
@ -395,6 +472,222 @@ impl PathwayMemory {
self.persist().await self.persist().await
} }
/// Mem0-style upsert. ADD if no existing live trace in the bucket
/// matches this trace's workflow fingerprint. UPDATE (bump
/// replay_count) if a match exists. NOOP semantically equivalent
/// to UPDATE here — kept for symmetry with playbook_memory and
/// future-proofing if we add merge logic.
///
/// "Live" means: not retired, not superseded.
///
/// Replaces raw `insert` for callers that want dedup. Existing
/// `insert` callers (scrum_master) keep raw-append semantics so
/// behavior is back-compat.
pub async fn upsert(&self, mut trace: PathwayTrace) -> Result<PathwayUpsertOutcome, String> {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
if trace.trace_uid.is_empty() {
trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
if trace.version == 0 { trace.version = 1; }
let new_fp = workflow_fingerprint(&trace);
let mut s = self.state.write().await;
let bucket = s.pathways.entry(trace.pathway_id.clone()).or_default();
// Find a live trace (not retired, not superseded) with same workflow.
let mut existing_idx: Option<usize> = None;
for (i, t) in bucket.iter().enumerate() {
if t.retired { continue; }
if t.superseded_at.is_some() { continue; }
if workflow_fingerprint(t) == new_fp {
existing_idx = Some(i);
break;
}
}
let pathway_id = trace.pathway_id.clone();
let outcome = match existing_idx {
None => {
let trace_uid = trace.trace_uid.clone();
bucket.push(trace);
PathwayUpsertOutcome::Added { pathway_id, trace_uid }
}
Some(i) => {
// UPDATE: bump replay counters on the existing trace
// instead of duplicating. Replays_succeeded only bumps
// on accepted final_verdict (mirror record_replay logic).
let existing = &mut bucket[i];
existing.replay_count = existing.replay_count.saturating_add(1);
if trace.final_verdict == "accepted" {
existing.replays_succeeded = existing.replays_succeeded.saturating_add(1);
}
PathwayUpsertOutcome::Updated {
pathway_id,
trace_uid: existing.trace_uid.clone(),
replay_count: existing.replay_count,
}
}
};
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(outcome)
}
/// Mem0-style retire. Marks a specific trace (by trace_uid) retired
/// with a human-readable reason. Retired traces are excluded from
/// hot-swap and bug_fingerprints retrieval. Idempotent: retiring an
/// already-retired trace returns Ok(false) without modification.
pub async fn retire(&self, trace_uid: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut s = self.state.write().await;
'outer: for traces in s.pathways.values_mut() {
for t in traces.iter_mut() {
if t.trace_uid == trace_uid && !t.retired {
t.retired = true;
t.retirement_reason = Some(reason.to_string());
touched = true;
break 'outer;
}
}
}
if touched {
s.last_updated_at = Utc::now().timestamp_millis();
}
}
if touched { self.persist().await?; }
Ok(touched)
}
/// Mem0-style revise. Supersedes parent trace, chains the new
/// version. New version inherits parent_trace_uid; parent gets
/// superseded_at + superseded_by_trace_uid stamped. Rejects if
/// parent is retired or already superseded (revise the tip, not
/// the middle of the chain).
pub async fn revise(
&self,
parent_trace_uid: &str,
mut new_trace: PathwayTrace,
) -> Result<PathwayReviseOutcome, String> {
let now = Utc::now().to_rfc3339();
if new_trace.pathway_vec.is_empty() {
new_trace.pathway_vec = build_pathway_vec(&new_trace);
}
if new_trace.trace_uid.is_empty() {
new_trace.trace_uid = uuid::Uuid::new_v4().to_string();
}
let mut s = self.state.write().await;
// Locate parent across all buckets
let mut parent_loc: Option<(String, usize)> = None;
for (bucket_key, traces) in s.pathways.iter() {
for (i, t) in traces.iter().enumerate() {
if t.trace_uid == parent_trace_uid {
parent_loc = Some((bucket_key.clone(), i));
break;
}
}
if parent_loc.is_some() { break; }
}
let (parent_bucket, parent_idx) = parent_loc
.ok_or_else(|| format!("parent trace_uid '{parent_trace_uid}' not found"))?;
// Validate parent state
{
let parent = &s.pathways[&parent_bucket][parent_idx];
if parent.retired {
return Err(format!(
"cannot revise retired trace '{parent_trace_uid}' — retirement is terminal"
));
}
if parent.superseded_at.is_some() {
return Err(format!(
"trace '{parent_trace_uid}' already superseded; revise the tip of the chain"
));
}
}
let parent_version = s.pathways[&parent_bucket][parent_idx].version;
let new_version = parent_version.saturating_add(1);
let new_uid = new_trace.trace_uid.clone();
new_trace.version = new_version;
new_trace.parent_trace_uid = Some(parent_trace_uid.to_string());
new_trace.superseded_at = None;
new_trace.superseded_by_trace_uid = None;
// Stamp parent
{
let parent_mut = &mut s.pathways.get_mut(&parent_bucket).unwrap()[parent_idx];
parent_mut.superseded_at = Some(now.clone());
parent_mut.superseded_by_trace_uid = Some(new_uid.clone());
}
// Append new version (same bucket if same pathway_id)
s.pathways
.entry(new_trace.pathway_id.clone())
.or_default()
.push(new_trace);
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
Ok(PathwayReviseOutcome {
parent_trace_uid: parent_trace_uid.to_string(),
parent_version,
new_trace_uid: new_uid,
new_version,
superseded_at: now,
})
}
/// Walk the version chain containing trace_uid. Returns root→tip.
/// Empty if trace_uid not found. Cycle-safe.
pub async fn history(&self, trace_uid: &str) -> Vec<PathwayTrace> {
let s = self.state.read().await;
// Build trace_uid → trace map across all buckets
let mut by_uid: HashMap<String, PathwayTrace> = HashMap::new();
for traces in s.pathways.values() {
for t in traces {
if !t.trace_uid.is_empty() {
by_uid.insert(t.trace_uid.clone(), t.clone());
}
}
}
let Some(seed) = by_uid.get(trace_uid).cloned() else {
return Vec::new();
};
// Walk back to root
let mut visited: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut root = seed.clone();
while let Some(parent_id) = root.parent_trace_uid.clone() {
if !visited.insert(parent_id.clone()) { break; }
match by_uid.get(&parent_id) {
Some(p) => root = p.clone(),
None => break,
}
}
// Walk forward to tip
let mut chain = vec![root.clone()];
let mut visited_fwd: std::collections::HashSet<String> = std::collections::HashSet::new();
visited_fwd.insert(root.trace_uid.clone());
let mut current = root;
while let Some(succ) = current.superseded_by_trace_uid.clone() {
if !visited_fwd.insert(succ.clone()) { break; }
match by_uid.get(&succ) {
Some(n) => {
chain.push(n.clone());
current = n.clone();
}
None => break,
}
}
chain
}
/// Query for a hot-swap candidate. Returns `None` if no eligible /// Query for a hot-swap candidate. Returns `None` if no eligible
/// pathway exists — caller should run the full ladder. Returns /// pathway exists — caller should run the full ladder. Returns
/// `Some(cand)` if all gates pass — caller can short-circuit to /// `Some(cand)` if all gates pass — caller can short-circuit to
@ -422,6 +715,11 @@ impl PathwayMemory {
if p.retired { if p.retired {
continue; continue;
} }
// Mem0 versioning: superseded traces are excluded from
// retrieval — only the tip of each version chain counts.
if p.superseded_at.is_some() {
continue;
}
// audit_consensus gate: explicit FAIL blocks hot-swap. A null // audit_consensus gate: explicit FAIL blocks hot-swap. A null
// audit_consensus (auditor hasn't seen this pathway yet) is // audit_consensus (auditor hasn't seen this pathway yet) is
// NOT a block — the success_rate gate below still requires // NOT a block — the success_rate gate below still requires
@ -523,6 +821,11 @@ impl PathwayMemory {
// are semantically equivalent within a pattern_key by design). // are semantically equivalent within a pattern_key by design).
let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new(); let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new();
for t in traces { for t in traces {
// Mem0 versioning: skip retired + superseded traces so
// their bug patterns don't leak into future retrievals.
if t.retired || t.superseded_at.is_some() {
continue;
}
for bp in &t.bug_fingerprints { for bp in &t.bug_fingerprints {
let key = (format!("{:?}", bp.flag), bp.pattern_key.clone()); let key = (format!("{:?}", bp.flag), bp.pattern_key.clone());
let entry = agg.entry(key).or_insert_with(|| { let entry = agg.entry(key).or_insert_with(|| {

View File

@ -157,6 +157,11 @@ pub fn router(state: VectorState) -> Router {
.route("/pathway/stats", get(pathway_stats)) .route("/pathway/stats", get(pathway_stats))
// ADR-021 Phase C: pre-review bug-fingerprint retrieval. // ADR-021 Phase C: pre-review bug-fingerprint retrieval.
.route("/pathway/bug_fingerprints", post(pathway_bug_fingerprints)) .route("/pathway/bug_fingerprints", post(pathway_bug_fingerprints))
// Mem0 ops (J 2026-04-25): upsert/retire/revise/history.
.route("/pathway/upsert", post(pathway_upsert))
.route("/pathway/retire", post(pathway_retire))
.route("/pathway/revise", post(pathway_revise))
.route("/pathway/history/{trace_uid}", get(pathway_history))
.with_state(state) .with_state(state)
} }
@ -2904,6 +2909,58 @@ async fn pathway_bug_fingerprints(
Json(json!({ "fingerprints": fps })) Json(json!({ "fingerprints": fps }))
} }
// ─── Mem0 ops endpoints (J 2026-04-25) ───
async fn pathway_upsert(
State(state): State<VectorState>,
Json(trace): Json<pathway_memory::PathwayTrace>,
) -> impl IntoResponse {
match state.pathway_memory.upsert(trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayRetireRequest {
trace_uid: String,
reason: String,
}
async fn pathway_retire(
State(state): State<VectorState>,
Json(req): Json<PathwayRetireRequest>,
) -> impl IntoResponse {
match state.pathway_memory.retire(&req.trace_uid, &req.reason).await {
Ok(touched) => Ok(Json(json!({"ok": true, "retired": touched}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Deserialize)]
struct PathwayReviseRequest {
parent_trace_uid: String,
new_trace: pathway_memory::PathwayTrace,
}
async fn pathway_revise(
State(state): State<VectorState>,
Json(req): Json<PathwayReviseRequest>,
) -> impl IntoResponse {
match state.pathway_memory.revise(&req.parent_trace_uid, req.new_trace).await {
Ok(outcome) => Ok(Json(json!({"ok": true, "outcome": outcome}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn pathway_history(
State(state): State<VectorState>,
axum::extract::Path(trace_uid): axum::extract::Path<String>,
) -> impl IntoResponse {
let chain = state.pathway_memory.history(&trace_uid).await;
Json(json!({"trace_uid": trace_uid, "chain_len": chain.len(), "chain": chain}))
}
#[cfg(test)] #[cfg(test)]
mod extractor_tests { mod extractor_tests {
use super::*; use super::*;

View File

@ -18,6 +18,7 @@ import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { z } from "zod"; import { z } from "zod";
import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js"; import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js";
import { buildPermitBrief } from "./entity.js";
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const PORT = parseInt(process.env.MCP_PORT || "3700"); const PORT = parseInt(process.env.MCP_PORT || "3700");
@ -960,6 +961,61 @@ async function main() {
return new Response(Bun.file(import.meta.dir + "/console.html")); return new Response(Bun.file(import.meta.dir + "/console.html"));
} }
// ─── Contractor / entity drill-down page ───
// Single-contractor portfolio view across every wired source:
// OSHA national, Chicago history, ticker chart, parent link,
// federal contracts, debarment, unions, training. Click any
// contractor name in a permit Entity Brief to land here.
if (url.pathname === "/contractor") {
return new Response(Bun.file(import.meta.dir + "/contractor.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
if (url.pathname === "/intelligence/contractor_profile" && req.method === "POST") {
const start = Date.now();
try {
const b = (await req.json().catch(() => ({}))) as { name?: string };
if (!b.name) return err("missing name", 400);
// Use the entity-brief library directly — single entity, all sources.
const { fetchOshaBrief, fetchTickerBrief, fetchContractorHistory, fetchParentLink, fetchFederalContracts, fetchDebarmentBrief, fetchNlrbBriefReal, fetchIlsosBrief, fetchNewsMentions, fetchDiversityCerts, scoreNewsSentiment, fetchBlsConstructionTrend, normalizeEntityName, entityTicker } = await import("./entity.js");
const [osha, stock, history, parent_link, federal, debarment, nlrb, ilsos, news, diversity, macro] = await Promise.all([
fetchOshaBrief(b.name),
fetchTickerBrief(b.name),
fetchContractorHistory(b.name),
fetchParentLink(b.name),
fetchFederalContracts(b.name),
fetchDebarmentBrief(b.name),
fetchNlrbBriefReal(b.name),
fetchIlsosBrief(b.name),
fetchNewsMentions(b.name),
fetchDiversityCerts(b.name),
fetchBlsConstructionTrend(),
]);
const news_sentiment = news ? scoreNewsSentiment(news) : null;
return ok({
key: normalizeEntityName(b.name),
display_name: b.name,
ticker: entityTicker(b.name),
osha,
stock,
history,
parent_link,
federal,
debarment,
nlrb,
ilsos,
news,
news_sentiment,
diversity,
macro,
generated_at: new Date().toISOString(),
duration_ms: Date.now() - start,
});
} catch (e: any) {
return err(`contractor_profile: ${e.message}`, 500);
}
}
// Intelligence: Market data — public building permits → staffing demand forecast // Intelligence: Market data — public building permits → staffing demand forecast
if (url.pathname === "/intelligence/market" && req.method === "POST") { if (url.pathname === "/intelligence/market" && req.method === "POST") {
const start = Date.now(); const start = Date.now();
@ -1232,8 +1288,12 @@ async function main() {
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json"; const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
// Recent + substantial permits only — skip tiny ones that // Recent + substantial permits only — skip tiny ones that
// don't imply real staffing demand. // don't imply real staffing demand.
// Include contact_1 + contact_2 fields so the Entity Brief
// panel on each card can populate without a second fetch.
// Contacts identify the applicant / contractor by name —
// those are the keys we pass to OSHA/ILSOS enrichment.
const permits: any[] = await fetch( const permits: any[] = await fetch(
`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date&` `${permitUrl}?$select=id,permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,contact_1_name,contact_1_type,contact_2_name,contact_2_type&`
+ `$where=reported_cost>250000 AND issue_date>'2025-06-01'` + `$where=reported_cost>250000 AND issue_date>'2025-06-01'`
+ `&$order=issue_date DESC&$limit=6` + `&$order=issue_date DESC&$limit=6`
).then(r => r.json()).catch(() => []); ).then(r => r.json()).catch(() => []);
@ -1367,12 +1427,19 @@ async function main() {
contracts.push({ contracts.push({
permit: { permit: {
id: p.id,
cost, cost,
work_type: p.work_type || "General construction", work_type: p.work_type || "General construction",
description: (p.work_description || "").substring(0, 140), description: (p.work_description || "").substring(0, 140),
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(), address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
community_area: p.community_area, community_area: p.community_area,
issue_date: (p.issue_date || "").substring(0, 10), issue_date: (p.issue_date || "").substring(0, 10),
// Contacts — used by /intelligence/permit_entities to
// enrich each card with OSHA + ILSOS on expand.
contact_1_name: p.contact_1_name || "",
contact_1_type: p.contact_1_type || "",
contact_2_name: p.contact_2_name || "",
contact_2_type: p.contact_2_type || "",
}, },
implied_bill_rate: contractBillRate, implied_bill_rate: contractBillRate,
timeline: { timeline: {
@ -1426,6 +1493,58 @@ async function main() {
} }
} }
// Intelligence: per-permit entity brief — OSHA + ILSOS + property
// Takes a permit identifier (we look it up from Chicago Socrata) or
// raw contact fields directly from the client. Returns an "ETF
// basket" shape: property + entities + per-entity risk factors.
// OSHA is live-scraped (cached 30d). ILSOS returns a structured
// placeholder because apps.ilsos.gov blocks our ASN.
if (url.pathname === "/intelligence/permit_entities" && req.method === "POST") {
const start = Date.now();
try {
const b = await req.json().catch(() => ({})) as {
permit_id?: string;
address?: string;
work_type?: string;
contact_1_name?: string;
contact_1_type?: string;
contact_2_name?: string;
contact_2_type?: string;
fetch_osha?: boolean;
fetch_ilsos?: boolean;
};
// If the caller didn't pass contact fields but did pass a
// permit_id, go pull the record from Chicago Socrata.
let permit = b;
if (b.permit_id && !b.contact_1_name) {
const u = `https://data.cityofchicago.org/resource/ydr8-5enu.json?$where=id='${encodeURIComponent(b.permit_id)}'`;
const rows = (await fetch(u).then((r) => r.json())) as any[];
const p = rows?.[0];
if (p) {
const addr = [p.street_number, p.street_direction, p.street_name]
.filter(Boolean)
.join(" ");
permit = {
permit_id: b.permit_id,
address: addr,
work_type: p.work_type,
contact_1_name: p.contact_1_name,
contact_1_type: p.contact_1_type,
contact_2_name: p.contact_2_name,
contact_2_type: p.contact_2_type,
};
}
}
const brief = await buildPermitBrief(permit, {
fetchOsha: b.fetch_osha !== false,
fetchIlsos: b.fetch_ilsos !== false,
});
return ok({ ...brief, duration_ms: Date.now() - start });
} catch (e: any) {
return err(`permit_entities: ${e.message}`, 500);
}
}
// Removed 2026-04-20: /intelligence/learn was a legacy CSV writer // Removed 2026-04-20: /intelligence/learn was a legacy CSV writer
// that destructively re-wrote successful_playbooks. /log and // that destructively re-wrote successful_playbooks. /log and
// /log_failure replace it cleanly via /vectors/playbook_memory/seed // /log_failure replace it cleanly via /vectors/playbook_memory/seed

View File

@ -224,6 +224,170 @@ async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: Observe
// persists across cycles. // persists across cycles.
const escalatedSigHashes = new Set<string>(); const escalatedSigHashes = new Set<string>();
// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ───
//
// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict
// can be treated as truth about whether a candidate review is grounded.
// Two-tier evaluator:
// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with
// response + source excerpt + grounding stats as context.
// 2. On cloud failure (throttle/timeout) → deterministic heuristic
// over grounding_pct + total_quotes. Marked source: "heuristic"
// so consumers can tell which rung produced the verdict.
// Every verdict is persisted to data/_kb/observer_reviews.jsonl.
const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl";
interface HandReviewInput {
file_path: string;
model: string;
response: string;
source_content: string;
grounding_stats: { total: number; grounded: number; groundedPct: number | null };
attempt: number;
}
interface HandReviewVerdict {
verdict: "accept" | "reject" | "cycle";
confidence: number;
notes: string;
source: "cloud" | "heuristic";
}
async function handReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const t0 = Date.now();
let verdict: HandReviewVerdict;
try {
verdict = await cloudHandReview(input);
} catch (e) {
console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`);
verdict = heuristicHandReview(input);
}
// Persist regardless of source so we can later compare cloud vs
// heuristic verdicts on the same input and tune the heuristic.
const row = {
ts: new Date().toISOString(),
file_path: input.file_path,
model: input.model,
attempt: input.attempt,
response_chars: input.response.length,
grounding_stats: input.grounding_stats,
verdict: verdict.verdict,
confidence: verdict.confidence,
notes: verdict.notes,
source: verdict.source,
duration_ms: Date.now() - t0,
};
try {
const { appendFile } = await import("node:fs/promises");
await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n");
} catch { /* best-effort persistence */ }
return verdict;
}
async function cloudHandReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const grounded = input.grounding_stats.grounded;
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
// Truncate to keep the prompt under typical context windows.
// 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context.
const responseExcerpt = input.response.slice(0, 2000);
const sourceExcerpt = input.source_content.slice(0, 4000);
const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated.
FILE: ${input.file_path}
MODEL: ${input.model}
ATTEMPT: ${input.attempt}
ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""}
REVIEW (first 2000 chars):
\`\`\`
${responseExcerpt}
\`\`\`
SOURCE EXCERPT (first 4000 chars):
\`\`\`
${sourceExcerpt}
\`\`\`
Respond ONLY with a JSON object:
{
"verdict": "accept" | "reject" | "cycle",
"confidence": 0-100,
"notes": "<1-2 sentences on what makes this grounded or hallucinated>"
}
- accept: review references real symbols/lines in source; findings could be acted on.
- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record.
- cycle: review is mediocre partially grounded but wrong shape, try a stronger model.`;
// Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud
// throttle that drove every prior iter into the heuristic fallback.
// Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical
// hand-review (~6K input + 300 output) costs ~$0.0014. Selected via
// J directive 2026-04-25 ("best model under $0.72/M").
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model: "x-ai/grok-4.1-fast",
messages: [{ role: "user", content: prompt }],
max_tokens: 300,
temperature: 0.0,
}),
signal: AbortSignal.timeout(45000),
});
if (!resp.ok) {
throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
}
const j: any = await resp.json();
const content = (j?.choices?.[0]?.message?.content ?? "").trim();
// Pull JSON object from the response — model may wrap it in prose.
const m = content.match(/\{[\s\S]*\}/);
if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`);
const parsed = JSON.parse(m[0]);
const v = String(parsed.verdict ?? "accept").toLowerCase();
return {
verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept",
confidence: Number(parsed.confidence ?? 50),
notes: String(parsed.notes ?? "").slice(0, 500),
source: "cloud",
};
}
function heuristicHandReview(input: HandReviewInput): HandReviewVerdict {
// Deterministic fallback when cloud is throttled. Conservative:
// only flip to reject when the evidence is overwhelming, otherwise
// accept (fall-open principle — observer is policy, not blocker).
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
const respLen = input.response.length;
// Too short to be a real review
if (respLen < 1500) {
return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" };
}
// Below 5 quotes — not enough signal to judge grounding; accept
if (total < 5 || pct === null) {
return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" };
}
// Very heavy hallucination
if (pct < 20) {
return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" };
}
// Mediocre — cycle to a stronger model
if (pct < 50) {
return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" };
}
// Good enough
return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" };
}
async function maybeEscalate(failures: ObservedOp[]) { async function maybeEscalate(failures: ObservedOp[]) {
// Group failures by sig_hash // Group failures by sig_hash
const bySig = new Map<string, ObservedOp[]>(); const bySig = new Map<string, ObservedOp[]>();
@ -362,6 +526,28 @@ function startHttpListener() {
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })), .map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
})); }));
} }
// ─── Hand-review endpoint (2026-04-25) ───
// scrum/agent posts a candidate response + source content + grounding
// stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with
// semantic context and returns {verdict, confidence, notes}. On
// cloud throttle, falls back to a deterministic heuristic over the
// grounding stats so the loop keeps moving with honest signal.
//
// This is the policy layer scrum was missing — pre-2026-04-25 the
// scrum_master applied a hardcoded grounding-rate threshold inline,
// which baked judgment into the wrong layer. Now scrum reports data
// (response + source + stats) and observer decides accept/reject/cycle.
if (req.method === "POST" && url.pathname === "/review") {
return req.json().then((body: any) => handReview(body))
.then((verdict) => new Response(JSON.stringify(verdict), {
headers: { "content-type": "application/json" },
}))
.catch((e: Error) =>
new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), {
status: 200, // fall-open shape — scrum keeps moving on observer failure
headers: { "content-type": "application/json" },
}));
}
if (req.method === "POST" && url.pathname === "/event") { if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => { return req.json().then((body: any) => {
const op: ObservedOp = { const op: ObservedOp = {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,259 @@
#!/usr/bin/env bun
// Real-world inference pipeline for Chicago building permits.
// Uses the unified matrix retriever (chicago_permits + entity_brief +
// sec_tickers + llm_team_runs + distilled_procedural) to enrich a
// Grok 4.1 fast analysis. Observer hand-reviews each result.
//
// First true USE of the matrix architecture on real ingested data —
// not the scrum self-improvement loop, the staffing intelligence loop.
//
// Usage:
// bun run scripts/analyze_chicago_contracts.ts [N]
// N = number of permits to analyze (default 5)
const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100";
const OBSERVER = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
const RAW_BUCKET = "raw";
const MC_ALIAS = "local";
const STAGE_DIR = "/tmp/chicago_analyze";
const OUTPUT = "/home/profit/lakehouse/data/_kb/contract_analyses.jsonl";
const CONTRACT_CORPORA = [
"chicago_permits_v1",
"entity_brief_v1",
"sec_tickers_v1",
"llm_team_runs_v1",
"llm_team_response_cache_v1",
"distilled_procedural_v20260423102847",
];
interface Permit {
permit_?: string;
permit_type?: string;
permit_status?: string;
work_description?: string;
reported_cost?: string | number;
contact_1?: any;
contact_2?: any;
contact_3_name?: string;
street_number?: string;
street_direction?: string;
street_name?: string;
suffix?: string;
issue_date?: string;
community_area?: string;
ward?: string;
[k: string]: any;
}
interface MatrixHit {
source_corpus: string;
score: number;
doc_id: string;
text: string;
}
function log(msg: string) { console.log(`[contract ${new Date().toISOString().slice(11,19)}] ${msg}`); }
async function fetchPermits(n: number): Promise<Permit[]> {
const fs = await import("node:fs/promises");
await fs.mkdir(STAGE_DIR, { recursive: true });
const local = `${STAGE_DIR}/permits.json`;
const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/chicago/permits_2026-04-25.json`, local]);
await proc.exited;
const all: Permit[] = JSON.parse(await Bun.file(local).text());
// Pick high-cost permits with named contractors — most interesting for staffing analysis.
// Field is `contact_1_name`, not `contact_1`. reported_cost is integer-like string.
const meaningful = all.filter(p =>
p.reported_cost && Number(p.reported_cost) >= 100000 &&
(p.contact_1_name || p.contact_2_name)
);
log(`raw permits: ${all.length} · meaningful (cost >= $100k + has contractor): ${meaningful.length}`);
// Sample evenly across the meaningful set
const sampled: Permit[] = [];
const stride = Math.max(1, Math.floor(meaningful.length / n));
for (let i = 0; i < meaningful.length && sampled.length < n; i += stride) {
sampled.push(meaningful[i]);
}
return sampled;
}
function permitToText(p: Permit): string {
const addr = `${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.replace(/\s+/g, " ").trim();
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
const c2 = (p as any).contact_2_name ?? (typeof p.contact_2 === "string" ? p.contact_2 : (p.contact_2?.name ?? ""));
return [
`Chicago Building Permit ${p.permit_ ?? "?"}`,
`Type: ${p.permit_type ?? "?"} · Status: ${p.permit_status ?? "?"}`,
`Address: ${addr} · Community ${p.community_area ?? "?"} · Ward ${p.ward ?? "?"}`,
`Issued: ${p.issue_date ?? "?"}`,
`Reported cost: $${Number(p.reported_cost ?? 0).toLocaleString()}`,
`Primary contractor: ${c1 || "unknown"}`,
c2 ? `Secondary: ${c2}` : "",
`Owner: ${p.contact_3_name ?? "?"}`,
`Work description: ${(p.work_description ?? "").slice(0, 800)}`,
].filter(Boolean).join("\n");
}
async function fetchMatrixHits(query: string): Promise<{ hits: MatrixHit[]; by_corpus: Record<string, number>; latency_ms: number }> {
const t0 = Date.now();
const all: MatrixHit[] = [];
const byCorpus: Record<string, number> = {};
await Promise.all(CONTRACT_CORPORA.map(async (idx) => {
try {
const r = await fetch(`${GATEWAY}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: idx, query, top_k: 3 }),
signal: AbortSignal.timeout(15000),
});
if (!r.ok) { byCorpus[idx] = -1; return; }
const data: any = await r.json();
const results = data.results ?? [];
byCorpus[idx] = results.length;
for (const h of results) {
all.push({
source_corpus: idx,
score: Number(h.score ?? 0),
doc_id: String(h.doc_id ?? "?"),
text: String(h.chunk_text ?? "").slice(0, 400),
});
}
} catch { byCorpus[idx] = -1; }
}));
all.sort((a, b) => b.score - a.score);
return { hits: all.slice(0, 10), by_corpus: byCorpus, latency_ms: Date.now() - t0 };
}
function buildMatrixPreamble(hits: MatrixHit[]): string {
if (hits.length === 0) return "";
const lines = [
`═══ 📖 MATRIX CONTEXT — ${hits.length} relevant hits across the knowledge base ═══`,
"Reference material from prior contractor data, SEC tickers, LLM team analyses, and distilled procedures. Use as evidence; do NOT invent.",
"",
];
for (let i = 0; i < hits.length; i++) {
const h = hits[i];
lines.push(`[${i + 1}] ${h.source_corpus} (score=${h.score.toFixed(2)}, doc=${h.doc_id}): ${h.text.replace(/\s+/g, " ").trim()}`);
}
lines.push("═══");
lines.push("");
return lines.join("\n");
}
async function chat(model: string, prompt: string): Promise<{ content: string; error?: string }> {
try {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model,
messages: [{ role: "user", content: prompt }],
max_tokens: 1500,
temperature: 0.1,
}),
signal: AbortSignal.timeout(90000),
});
if (!r.ok) return { content: "", error: `HTTP ${r.status}: ${(await r.text()).slice(0, 200)}` };
const j: any = await r.json();
return { content: j.choices?.[0]?.message?.content ?? "" };
} catch (e: any) { return { content: "", error: e.message }; }
}
async function observerReview(input: { permit_id: string; model: string; response: string; permit_text: string }): Promise<{ verdict: string; confidence: number; notes: string; source: string }> {
try {
const r = await fetch(`${OBSERVER}/review`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
file_path: `chicago_permit/${input.permit_id}`,
model: input.model,
response: input.response,
source_content: input.permit_text,
grounding_stats: { total: 0, grounded: 0, groundedPct: null },
attempt: 1,
}),
signal: AbortSignal.timeout(90000),
});
if (!r.ok) return { verdict: "accept", confidence: 50, notes: `observer ${r.status}`, source: "fallthrough" };
return await r.json();
} catch (e: any) { return { verdict: "accept", confidence: 50, notes: `observer error: ${e.message}`, source: "fallthrough" }; }
}
async function analyzeOne(p: Permit, idx: number, total: number): Promise<any> {
const permit_id = p.permit_ ?? `unknown_${idx}`;
const t0 = Date.now();
log(`══ permit ${idx + 1}/${total} · ${permit_id} · type=${p.permit_type} · cost=$${Number(p.reported_cost ?? 0).toLocaleString()}`);
const permitText = permitToText(p);
// Build matrix query: combine type + work description + contractor name for retrieval anchoring
const c1 = (p as any).contact_1_name ?? (typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""));
const matrixQuery = `${p.permit_type ?? ""} ${(p.work_description ?? "").slice(0, 300)} ${c1}`;
const matrix = await fetchMatrixHits(matrixQuery);
const corporaSummary = Object.entries(matrix.by_corpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
log(` 📖 matrix: ${matrix.hits.length} hits in ${matrix.latency_ms}ms · ${corporaSummary}`);
const preamble = buildMatrixPreamble(matrix.hits);
const task = `${preamble}You are a staffing-intelligence analyst reviewing a real Chicago building permit. Using the MATRIX CONTEXT above as evidence, produce a structured analysis:
PERMIT:
${permitText}
Produce a markdown analysis with:
1. **Permit summary** 2 sentences on what this is
2. **Contractor signal** what we know about the named contractor(s) from matrix context (cite [N] hits). If unknown, say so.
3. **Staffing fit** what trades/headcount/skills this permit implies
4. **Risk flags** anything in matrix context that suggests caution (debarment, prior incidents, low-quality history). If none, say so.
5. **Opportunity score** 0-100 with one-sentence rationale
Cite matrix hits as [N] inline. If matrix has no relevant hits, say "no matrix evidence" do NOT invent contractor history.`;
const resp = await chat("x-ai/grok-4.1-fast", task);
if (resp.error) {
log(` ✗ chat error: ${resp.error.slice(0, 100)}`);
return { permit_id, ok: false, error: resp.error, ts: new Date().toISOString() };
}
log(` ✓ analysis ${resp.content.length} chars`);
const verdict = await observerReview({ permit_id, model: "openrouter/x-ai/grok-4.1-fast", response: resp.content, permit_text: permitText });
log(` observer: ${verdict.verdict} (conf=${verdict.confidence}, src=${verdict.source})`);
return {
permit_id, ok: true,
permit_type: p.permit_type, cost: Number(p.reported_cost ?? 0),
contractor: c1, matrix_hits: matrix.hits.length, matrix_corpora: matrix.by_corpus, matrix_ms: matrix.latency_ms,
analysis: resp.content,
observer_verdict: verdict.verdict, observer_conf: verdict.confidence, observer_notes: verdict.notes, observer_src: verdict.source,
duration_ms: Date.now() - t0, ts: new Date().toISOString(),
};
}
async function main() {
const n = Number(process.argv[2] ?? 5);
log(`fetching ${n} permits from raw bucket...`);
const permits = await fetchPermits(n);
log(`analyzing ${permits.length} permits sequentially...`);
const fs = await import("node:fs/promises");
const { appendFile } = fs;
const results: any[] = [];
for (let i = 0; i < permits.length; i++) {
const r = await analyzeOne(permits[i], i, permits.length);
results.push(r);
await appendFile(OUTPUT, JSON.stringify(r) + "\n");
}
log(`\n══ SUMMARY ══`);
const ok = results.filter(r => r.ok).length;
const accepted = results.filter(r => r.observer_verdict === "accept").length;
const cycled = results.filter(r => r.observer_verdict === "cycle").length;
const rejected = results.filter(r => r.observer_verdict === "reject").length;
const avgHits = results.reduce((a, r) => a + (r.matrix_hits ?? 0), 0) / Math.max(1, results.length);
log(` permits analyzed: ${ok}/${results.length}`);
log(` observer: accept=${accepted} cycle=${cycled} reject=${rejected}`);
log(` avg matrix hits per permit: ${avgHits.toFixed(1)}`);
log(` output: ${OUTPUT}`);
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

95
scripts/dump_raw_corpus.sh Executable file
View File

@ -0,0 +1,95 @@
#!/bin/bash
# One-shot dump of all testing data into the `raw` MinIO bucket.
# Persistent test corpus so we don't re-extract every run.
#
# Layout:
# raw/
# staffing/ — workers_500k.parquet, resumes.parquet
# entities/ — entities.jsonl, sec_company_tickers.json
# llm_team/ — *.jsonl extracts from knowledge_base PG tables
# chicago/ — permits_<date>.json (last 30 days)
# MANIFEST.json — documents what's here + when
set -euo pipefail
REPO=/home/profit/lakehouse
BUCKET=raw
ALIAS=local
STAGE=$(mktemp -d /tmp/raw_dump.XXXXX)
trap 'rm -rf "$STAGE"' EXIT
DATE=$(date -u +%Y-%m-%d)
log() { echo "[dump $(date -u +%H:%M:%S)] $*"; }
log "creating bucket ${ALIAS}/${BUCKET} (idempotent)"
mc mb --ignore-existing ${ALIAS}/${BUCKET}
# ─── 1. STAFFING ───
log "staffing/ — workers_500k.parquet (323 MB) + resumes.parquet"
mc cp -q ${REPO}/data/datasets/workers_500k.parquet ${ALIAS}/${BUCKET}/staffing/workers_500k.parquet
mc cp -q ${REPO}/data/datasets/resumes.parquet ${ALIAS}/${BUCKET}/staffing/resumes.parquet
# ─── 2. ENTITIES + SEC + GEO ───
log "entities/ — contractor entities cache + SEC tickers + svep + tif districts"
mc cp -q ${REPO}/data/_entity_cache/entities.jsonl ${ALIAS}/${BUCKET}/entities/entities.jsonl
mc cp -q ${REPO}/data/_entity_cache/sec_company_tickers.json ${ALIAS}/${BUCKET}/sec/company_tickers.json
mc cp -q ${REPO}/data/_entity_cache/svep_log.json ${ALIAS}/${BUCKET}/entities/svep_log.json
mc cp -q ${REPO}/data/_entity_cache/tif_districts.geojson ${ALIAS}/${BUCKET}/chicago/tif_districts.geojson
# ─── 3. LLM TEAM HISTORY (Postgres → JSONL → S3) ───
log "llm_team/ — extracting from knowledge_base PG tables"
LLM_TABLES=(team_runs pipeline_runs lab_experiments lab_trials meta_pipelines meta_runs conversations response_cache memory_entries adaptive_runs)
for tbl in "${LLM_TABLES[@]}"; do
out=${STAGE}/${tbl}.jsonl
rows=$(sudo -u postgres psql -d knowledge_base -At -c "SELECT COUNT(*) FROM ${tbl};" 2>/dev/null || echo 0)
if [ "$rows" -eq 0 ]; then
log " · ${tbl}: 0 rows, skipping"
continue
fi
sudo -u postgres psql -d knowledge_base -At -c "COPY (SELECT row_to_json(t) FROM ${tbl} t) TO STDOUT;" > "$out" 2>/dev/null
size=$(du -h "$out" | awk '{print $1}')
log " · ${tbl}: ${rows} rows (${size})"
mc cp -q "$out" ${ALIAS}/${BUCKET}/llm_team/${tbl}.jsonl
done
# ─── 4. CHICAGO PERMITS (last 30 days, paginated) ───
log "chicago/ — pulling last 30 days of permits from data.cityofchicago.org"
since=$(date -u -d '30 days ago' +%Y-%m-%d)
out=${STAGE}/permits_${DATE}.json
url="https://data.cityofchicago.org/resource/ydr8-5enu.json?\$where=issue_date%3E='${since}'&\$limit=10000&\$order=issue_date%20DESC"
if curl -sf --max-time 60 "$url" -o "$out"; then
count=$(python3 -c "import json; print(len(json.load(open('${out}'))))")
size=$(du -h "$out" | awk '{print $1}')
log " · permits since ${since}: ${count} records (${size})"
mc cp -q "$out" ${ALIAS}/${BUCKET}/chicago/permits_${DATE}.json
else
log " · WARN: chicago permits fetch failed; skipping"
fi
# ─── 5. MANIFEST ───
log "writing MANIFEST.json"
manifest=${STAGE}/MANIFEST.json
python3 - <<PY
import json, subprocess, datetime
out = subprocess.check_output(['mc','ls','-r','--json','${ALIAS}/${BUCKET}'], text=True)
items = []
for line in out.strip().split('\n'):
if not line: continue
o = json.loads(line)
items.append({'key': o.get('key',''), 'size': o.get('size',0)})
total_size = sum(i['size'] for i in items)
manifest = {
'bucket': '${BUCKET}',
'created_at': datetime.datetime.utcnow().isoformat() + 'Z',
'total_objects': len(items),
'total_size_bytes': total_size,
'total_size_human': f'{total_size / (1024*1024):.1f} MB',
'items': items,
}
with open('${manifest}','w') as f:
json.dump(manifest, f, indent=2)
PY
mc cp -q "$manifest" ${ALIAS}/${BUCKET}/MANIFEST.json
log "DONE. Bucket contents:"
mc ls -r ${ALIAS}/${BUCKET} | head -30

View File

@ -0,0 +1,177 @@
#!/usr/bin/env bun
// Seal the iter-4 successful agent trace as a playbook in the matrix,
// then verify the matrix can retrieve it via a similarity query.
//
// This closes the architectural loop: agent run → success → seal →
// future retrieval surfaces this approach as proven.
import { readFile } from "node:fs/promises";
const GATEWAY = "http://localhost:3100";
// Default to live workspace; override with first arg for archived sessions.
const SESSION_DIR = process.argv[2] ?? "/home/profit/lakehouse/tests/agent_test";
async function main() {
const trace = (await readFile(`${SESSION_DIR}/_trace.jsonl`, "utf8"))
.split("\n").filter(l => l.trim()).map(l => JSON.parse(l));
const finalMd = await readFile(`${SESSION_DIR}/_final.md`, "utf8");
// Extract tool sequence from trace
const toolCalls = trace.filter(t => t.kind === "tool_call");
const toolSeq = toolCalls.map(t => t.tool).join(" → ");
const totalSteps = toolCalls.length;
const totalLatency = trace.filter(t => t.latency_ms).reduce((a, t) => a + (t.latency_ms ?? 0), 0);
console.log(`iter-4 trace: ${trace.length} events, ${totalSteps} tool calls, ${(totalLatency/1000).toFixed(1)}s total`);
console.log(`tool sequence: ${toolSeq}`);
console.log(`final output: ${finalMd.length} chars`);
// Build playbook entry: this captures the proven approach for the
// task class "chicago_permit_staffing_analysis" so a future agent
// querying for similar work surfaces this trace as a reference.
const operation = `Chicago permit staffing analysis — qwen3.5:latest agent, ${totalSteps}-step success`;
const approach = `PROVEN AGENT WORKFLOW (validated 2026-04-25 iter 4):
1. PLAN FIRST via note() explicit step list before any execution
2. list_permits(min_cost=N) get high-cost candidates
3. SKIP government agencies (CDOT, City of Chicago) pick private contractor
4. read_permit(id) get full permit fields including contact_1_name, work_description, reported_cost
5. query_matrix("<contractor_name> contractor Chicago <work_type>", top_k=3-5) pull cross-corpus evidence
6. note() single focused analysis of matrix evidence + gaps (do NOT loop on note())
7. done(summary=<5-section markdown>) Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation
KEY LESSONS:
- llm_team_runs_v1 + llm_team_response_cache_v1 are noise corpora exclude
- Useful corpora: chicago_permits_v1, entity_brief_v1, sec_tickers_v1, distilled_procedural_v20260423102847
- Matrix often returns "no specific evidence" for private contractors that's OK, acknowledge gap honestly, do NOT invent history
- Recommendation should reflect actual evidence: "Investigate-Further" when matrix is empty, not generic "Pursue"
- Total wall 30s for 6 tool calls`;
const context = `PRD: tests/agent_test/PRD.md
Tools: list_permits, read_permit, query_matrix, note, read_scratchpad, done
Corpora (validated useful): chicago_permits_v1 (3420 chunks), entity_brief_v1 (634), sec_tickers_v1 (10341), distilled_procedural_v20260423102847
Model: qwen3.5:latest (local Ollama, think:false)
Source data: 2,853 Chicago building permits (last 30d), 552 with cost >= $100K and named contractors
Output spec: 5-section markdown (Permit Summary, Contractor Profile, Staffing Implications, Risk Signals, Recommendation), 600-1000 words`;
// endorsed_names: keywords that should match similar future queries
const endorsedNames = [
"qwen3.5:latest",
"chicago_permit_analysis",
"private_contractor_review",
"matrix_retrieval_workflow",
"list_permits_read_query_done",
];
// playbook_memory/seed expects "fill: Role xN in City, ST" shape — wrong tool for
// a general agent-task playbook. Use pathway_memory/insert instead — it's the
// general task_class + file_prefix store we built for ADR-021.
console.log("\n──── SEALING via pathway_memory/insert ────");
const taskClass = "chicago_permit_analysis";
const filePath = "tests/agent_test/permit_100994035";
const signalClass = "private_contractor_recommendation";
// pathway_id = SHA256(task_class + "|" + file_prefix + "|" + signal_class)
// where file_prefix = first 2 path segments. Matches gateway's hot-swap logic.
const filePrefix = filePath.split("/").slice(0, 2).join("/");
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(`${taskClass}|${filePrefix}|${signalClass}`);
const pathwayId = hasher.digest("hex");
console.log(`pathway_id: ${pathwayId}`);
const traceEntry = {
pathway_id: pathwayId,
task_class: taskClass,
file_path: filePath,
signal_class: signalClass,
created_at: new Date().toISOString(),
ladder_attempts: toolCalls.map((t, i) => ({
rung: i + 1,
model: t.tool === "done" ? "qwen3.5:latest+done" : `qwen3.5:latest+${t.tool}`,
latency_ms: t.latency_ms ?? 0,
accepted: t.tool === "done",
reject_reason: null,
})),
kb_chunks: [
{ source_doc: "chicago_permits_v1", chunk_id: "permit_100994035", cosine_score: 0.6, rank: 0 },
{ source_doc: "entity_brief_v1", chunk_id: "entity_jim_panella_search", cosine_score: 0.58, rank: 1 },
{ source_doc: "sec_tickers_v1", chunk_id: "sec_no_match", cosine_score: 0.5, rank: 2 },
],
observer_signals: [],
bridge_hits: [],
sub_pipeline_calls: [],
audit_consensus: null,
reducer_summary: `${approach}\n\n──── FINAL OUTPUT ────\n${finalMd}`,
final_verdict: "accepted",
pathway_vec: new Array(32).fill(0), // gateway computes/replaces if it does
replay_count: 0,
replays_succeeded: 0,
semantic_flags: [],
type_hints_used: [],
bug_fingerprints: [],
retired: false,
};
// Use Mem0-style upsert (J 2026-04-25). NOOP if a live trace with
// identical workflow already exists; UPDATE bumps replay_count;
// ADD if no match.
const seal = await fetch(`${GATEWAY}/vectors/pathway/upsert`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(traceEntry),
signal: AbortSignal.timeout(30000),
});
if (!seal.ok) {
console.error(`✗ seal failed: ${seal.status}${(await seal.text()).slice(0, 300)}`);
process.exit(1);
}
const sealResult = await seal.json();
console.log(`✓ sealed via pathway/upsert: ${JSON.stringify(sealResult).slice(0, 300)}`);
// sealResult.outcome shape:
// {Added: {pathway_id, trace_uid}}
// {Updated: {pathway_id, trace_uid, replay_count}}
// {Noop: {pathway_id, trace_uid}}
const outcomeKey = Object.keys(sealResult.outcome ?? {})[0];
console.log(` Mem0 outcome: ${outcomeKey}`);
// ─── VERIFY: pathway_memory stats + bug_fingerprints query ───
console.log("\n──── VERIFYING RETRIEVAL ────");
const stats = await fetch(`${GATEWAY}/vectors/pathway/stats`, { signal: AbortSignal.timeout(10000) });
if (stats.ok) {
const s: any = await stats.json();
console.log(`pathway_memory stats: total=${s.total_pathways} retired=${s.retired} reuse_rate=${s.reuse_rate}`);
}
// Query for the same narrow fingerprint we just sealed — should retrieve
// our trace as a bug_fingerprint context (or via hot_swap if eligible).
const fpQuery = await fetch(`${GATEWAY}/vectors/pathway/bug_fingerprints`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
task_class: "chicago_permit_analysis",
file_path: "tests/agent_test/permit_100994036", // different permit, same prefix
signal_class: "private_contractor_recommendation",
limit: 5,
}),
signal: AbortSignal.timeout(10000),
});
if (fpQuery.ok) {
const result: any = await fpQuery.json();
const fps = result.fingerprints ?? result;
console.log(`bug_fingerprints retrieval (sister permit, same prefix): ${JSON.stringify(fps).slice(0, 400)}`);
}
// Confirm the trace landed in state.json
const stateProbe = await Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateProbe.exists()) {
const state: any = JSON.parse(await stateProbe.text());
let found = false;
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis") { found = true; break; }
}
if (found) break;
}
console.log(`state.json contains chicago_permit_analysis trace: ${found}`);
}
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -0,0 +1,259 @@
#!/usr/bin/env bun
// Vectorize each raw-bucket corpus into a queryable matrix index.
// Reads from local raw/ dump (bun fetch from MinIO), shapes into
// {id, text} docs, POSTs to gateway /vectors/index, polls job to done.
//
// Targets one index per source with stable names so MATRIX_CORPORA_FOR_TASK
// can reference them. Idempotent: re-running rebuilds with a fresh _v2.
//
// Usage:
// bun run scripts/vectorize_raw_corpus.ts [source...]
// Default: runs all sources in order. Sources: chicago, entities, sec, llm_team_runs, llm_team_response
const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100";
const RAW_BUCKET = "raw";
const MC_ALIAS = "local";
const STAGE_DIR = "/tmp/vectorize_raw";
interface Doc { id: string; text: string }
interface SourceSpec {
name: string; // CLI flag
index_name: string; // /vectors/index target
s3_key: string; // path under raw/
source_label: string; // gateway "source" field
chunk_size?: number;
overlap?: number;
extractor: (raw: string) => Doc[];
}
// Spawn mc to copy from S3 → local stage so we can read it
async function fetchFromRaw(key: string): Promise<string> {
const fs = await import("node:fs/promises");
await fs.mkdir(STAGE_DIR, { recursive: true });
const local = `${STAGE_DIR}/${key.replace(/\//g, "_")}`;
const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/${key}`, local]);
await proc.exited;
if (proc.exitCode !== 0) throw new Error(`mc cp failed for ${key}`);
return local;
}
async function readJsonl(path: string): Promise<any[]> {
const text = await Bun.file(path).text();
return text.split("\n").filter(l => l.trim()).map(l => JSON.parse(l));
}
function truncate(s: string, n = 4000): string {
return s == null ? "" : (s.length > n ? s.slice(0, n) : s);
}
// Sanitize text before posting as JSON. Strips control chars and
// drops incomplete \uXXXX escape sequences which break Rust's
// serde JSON parser at the gateway. Llm_team response cache had
// rows with truncated \u escapes that 400'd the whole batch.
function sanitize(s: string): string {
if (!s) return "";
return s
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "") // strip control chars
.replace(/\\/g, "") // strip ALL backslashes — kills any malformed \uXXXX in source data
.replace(/[\uD800-\uDFFF]/g, ""); // strip UTF-16 surrogates (lone ones from emoji split by truncate)
}
// ─── EXTRACTORS — one per source ───
// Each shapes raw rows into {id, text} for the gateway's chunker.
function extractChicagoPermits(raw: string): Doc[] {
const arr = JSON.parse(raw);
return arr.map((p: any, i: number) => {
const text = [
`Permit ${p.permit_ ?? p.permit_number ?? `unknown_${i}`}`,
`Type: ${p.permit_type ?? "?"} Status: ${p.permit_status ?? "?"}`,
`Address: ${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.trim(),
`Issued: ${p.issue_date ?? "?"} Applied: ${p.application_start_date ?? "?"}`,
`Work: ${truncate(p.work_description ?? "", 800)}`,
`Estimated cost: ${p.reported_cost ?? p.estimated_cost ?? "?"}`,
`Contractors: ${p.contact_1 ?? ""} | ${p.contact_2 ?? ""}`,
`Owner: ${p.contact_3_name ?? ""} (${p.contact_3_type ?? ""})`,
`Subtypes: ${p.subtotal_paid ?? ""} community area=${p.community_area ?? ""} ward=${p.ward ?? ""}`,
].filter(Boolean).join("\n");
return { id: `permit_${p.permit_ ?? p.id ?? i}`, text };
});
}
function extractEntities(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const e = JSON.parse(line);
const name = e.normalized_name ?? e.name ?? e.display_name ?? `entity_${i}`;
const text = [
`Entity: ${name}`,
`Display: ${e.display_name ?? name}`,
e.ticker ? `Ticker: ${e.ticker}` : "",
e.cik ? `CIK: ${e.cik}` : "",
e.aliases ? `Aliases: ${(e.aliases ?? []).join(", ")}` : "",
e.last_seen ? `Last seen: ${e.last_seen}` : "",
e.notes ? `Notes: ${truncate(JSON.stringify(e.notes), 600)}` : "",
`Raw: ${truncate(JSON.stringify(e), 1500)}`,
].filter(Boolean).join("\n");
return { id: `entity_${name}_${i}`, text };
} catch {
return { id: `entity_${i}`, text: line.slice(0, 1000) };
}
});
}
function extractSecTickers(raw: string): Doc[] {
// SEC tickers JSON: {"_fetched_at": ..., "rows": {"0": {cik_str, ticker, title}, ...}}
const obj = JSON.parse(raw);
// The actual rows are under .rows; fall back to top-level if no wrapper.
const rows = obj.rows ?? obj;
return Object.values(rows)
.filter((r: any) => r && typeof r === "object" && r.ticker)
.map((row: any, i: number) => ({
id: `sec_${row.ticker ?? i}`,
text: `Ticker: ${row.ticker}\nCompany: ${row.title ?? "?"}\nCIK: ${row.cik_str ?? "?"}`,
}));
}
function extractLlmTeamRuns(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const r = JSON.parse(line);
const text = [
`Team run ${r.id ?? i}`,
`Mode: ${r.mode ?? "?"} Created: ${r.created_at ?? "?"}`,
r.prompt ? `Prompt: ${truncate(r.prompt, 1200)}` : "",
r.input ? `Input: ${truncate(typeof r.input === "string" ? r.input : JSON.stringify(r.input), 1200)}` : "",
r.output ? `Output: ${truncate(typeof r.output === "string" ? r.output : JSON.stringify(r.output), 2000)}` : "",
r.result ? `Result: ${truncate(typeof r.result === "string" ? r.result : JSON.stringify(r.result), 2000)}` : "",
r.metadata ? `Meta: ${truncate(JSON.stringify(r.metadata), 600)}` : "",
].filter(Boolean).join("\n");
return { id: `team_run_${r.id ?? i}`, text };
} catch {
return { id: `team_run_${i}`, text: line.slice(0, 2000) };
}
});
}
function extractLlmTeamResponseCache(raw: string): Doc[] {
return raw.split("\n").filter(l => l.trim()).map((line, i) => {
try {
const r = JSON.parse(line);
const text = [
`Cached response ${r.cache_key ?? r.id ?? i}`,
`Created: ${r.created_at ?? "?"}`,
r.prompt ? `Prompt: ${sanitize(truncate(r.prompt, 1500))}` : "",
r.response ? `Response: ${sanitize(truncate(r.response, 2500))}` : "",
r.model ? `Model: ${r.model}` : "",
].filter(Boolean).join("\n");
return { id: `resp_${r.cache_key ?? r.id ?? i}`, text };
} catch {
return { id: `resp_${i}`, text: sanitize(line.slice(0, 2000)) };
}
});
}
const SOURCES: SourceSpec[] = [
{ name: "chicago", index_name: "chicago_permits_v1", s3_key: "chicago/permits_2026-04-25.json",
source_label: "chicago_permits", chunk_size: 600, overlap: 80, extractor: extractChicagoPermits },
{ name: "entities", index_name: "entity_brief_v1", s3_key: "entities/entities.jsonl",
source_label: "entity_brief", chunk_size: 500, overlap: 60, extractor: extractEntities },
{ name: "sec", index_name: "sec_tickers_v1", s3_key: "sec/company_tickers.json",
source_label: "sec_tickers", chunk_size: 200, overlap: 20, extractor: extractSecTickers },
{ name: "llm_team_runs", index_name: "llm_team_runs_v1", s3_key: "llm_team/team_runs.jsonl",
source_label: "llm_team_runs", chunk_size: 800, overlap: 100, extractor: extractLlmTeamRuns },
{ name: "llm_team_response", index_name: "llm_team_response_cache_v1", s3_key: "llm_team/response_cache.jsonl",
source_label: "llm_team_response_cache", chunk_size: 800, overlap: 100, extractor: extractLlmTeamResponseCache },
];
async function vectorizeOne(spec: SourceSpec): Promise<{ ok: boolean; chunks: number; job_id?: string; err?: string }> {
const t0 = Date.now();
console.log(`\n━━━ ${spec.name}${spec.index_name} ━━━`);
console.log(`fetching s3://${RAW_BUCKET}/${spec.s3_key}`);
let local: string;
try { local = await fetchFromRaw(spec.s3_key); }
catch (e: any) { return { ok: false, chunks: 0, err: `fetch: ${e.message}` }; }
console.log(`reading + extracting...`);
const raw = await Bun.file(local).text();
const docs = spec.extractor(raw);
if (docs.length === 0) return { ok: false, chunks: 0, err: "0 docs after extraction" };
console.log(` ${docs.length} docs (avg ${Math.round(docs.reduce((a, d) => a + d.text.length, 0) / docs.length)} chars)`);
console.log(`POST /vectors/index ${spec.index_name} ...`);
const resp = await fetch(`${GATEWAY}/vectors/index`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
index_name: spec.index_name,
source: spec.source_label,
documents: docs,
chunk_size: spec.chunk_size,
overlap: spec.overlap,
}),
signal: AbortSignal.timeout(300000),
});
if (!resp.ok) {
const body = await resp.text();
return { ok: false, chunks: 0, err: `HTTP ${resp.status}: ${body.slice(0, 300)}` };
}
const j: any = await resp.json();
const ms = Date.now() - t0;
console.log(` ✓ submitted: job=${j.job_id} chunks=${j.chunks} (extract+submit ${(ms/1000).toFixed(1)}s)`);
return { ok: true, chunks: j.chunks, job_id: j.job_id };
}
async function pollJob(jobId: string): Promise<{ status: string; processed: number; total: number }> {
const r = await fetch(`${GATEWAY}/vectors/jobs/${jobId}`, { signal: AbortSignal.timeout(5000) });
if (!r.ok) return { status: "unknown", processed: 0, total: 0 };
const j: any = await r.json();
return { status: j.status ?? "?", processed: j.processed ?? 0, total: j.total ?? 0 };
}
async function waitForJob(jobId: string, label: string, maxSec = 600): Promise<void> {
const t0 = Date.now();
let lastLog = 0;
while ((Date.now() - t0) / 1000 < maxSec) {
const s = await pollJob(jobId);
if (s.status === "complete" || s.status === "completed" || s.status === "done") {
console.log(`${label} job ${jobId.slice(0,8)} complete (${s.processed}/${s.total} in ${((Date.now()-t0)/1000).toFixed(0)}s)`);
return;
}
if (s.status === "failed" || s.status === "error") {
console.log(`${label} job ${jobId.slice(0,8)} failed at ${s.processed}/${s.total}`);
return;
}
if (Date.now() - lastLog > 15000) {
console.log(` · ${label} progress ${s.processed}/${s.total} (${s.status})`);
lastLog = Date.now();
}
await new Promise(r => setTimeout(r, 3000));
}
console.log(`${label} job ${jobId.slice(0,8)} still running after ${maxSec}s — leaving in background`);
}
async function main() {
const args = process.argv.slice(2);
const targets = args.length > 0 ? SOURCES.filter(s => args.includes(s.name)) : SOURCES;
console.log(`Vectorizing ${targets.length} source(s): ${targets.map(t => t.name).join(", ")}`);
const results: Array<{ name: string; result: any }> = [];
for (const spec of targets) {
try {
const r = await vectorizeOne(spec);
if (r.ok && r.job_id) await waitForJob(r.job_id, spec.name);
results.push({ name: spec.name, result: r });
} catch (e: any) {
console.error(`! ${spec.name}: ${e.message}`);
results.push({ name: spec.name, result: { ok: false, err: e.message } });
}
}
console.log(`\n━━━ SUMMARY ━━━`);
for (const { name, result } of results) {
console.log(` ${result.ok ? "✓" : "✗"} ${name.padEnd(20)} chunks=${result.chunks ?? 0} ${result.err ? `err=${result.err}` : ""}`);
}
}
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -0,0 +1,353 @@
#!/usr/bin/env bun
// Agent harness — runs local qwen3.5:latest as an autonomous agent
// against PRD.md. Exposes a tool-call loop. Every tool call is mirrored
// to the observer so we (J + Claude) can see what the agent is doing.
//
// Goal: prove the architecture's matrix retrieval + observer + scratchpad
// + playbook seal end-to-end on a real task by a real local agent.
//
// Iter 1: just run it. Watch where it gets stuck.
// Iter N: tune helpers based on what we observed.
import { appendFile, readFile } from "node:fs/promises";
import { existsSync, mkdirSync } from "node:fs";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const OBSERVER = "http://localhost:3800";
const PRD_PATH = "/home/profit/lakehouse/tests/agent_test/PRD.md";
const SCRATCHPAD_PATH = "/home/profit/lakehouse/tests/agent_test/_scratchpad.txt";
const TRACE_PATH = "/home/profit/lakehouse/tests/agent_test/_trace.jsonl";
const FINAL_PATH = "/home/profit/lakehouse/tests/agent_test/_final.md";
const PERMITS_RAW = "/tmp/vectorize_raw/chicago_permits_2026-04-25.json";
const AGENT_MODEL = process.env.AGENT_MODEL ?? "qwen3.5:latest";
const MAX_STEPS = Number(process.env.AGENT_MAX_STEPS ?? 15);
const SESSION_ID = `agent_${Date.now().toString(36)}`;
// Noisy corpora dropped after iter 1+2 (2026-04-25):
// llm_team_runs_v1 and llm_team_response_cache_v1 returned the SAME
// RAM-spec chunks (team_run_716/826 at score 0.59) regardless of query.
// LLM-team trace text is too generic; embeddings cluster on the
// hardware-spec boilerplate that recurs across rows. Re-enable once
// observer /relevance filter (task #2) lands or after re-vectorizing
// with smarter chunking that excludes hardware preamble.
const CORPORA = [
"chicago_permits_v1",
"entity_brief_v1",
"sec_tickers_v1",
"distilled_procedural_v20260423102847",
];
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[harness ${ts}] ${msg}`);
}
async function emitObserverEvent(payload: object) {
try {
await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ source: "agent_test", session_id: SESSION_ID, ...payload, ts: new Date().toISOString() }),
signal: AbortSignal.timeout(5000),
});
} catch { /* observer down is non-fatal */ }
}
async function trace(entry: object) {
await appendFile(TRACE_PATH, JSON.stringify({ ts: new Date().toISOString(), session_id: SESSION_ID, ...entry }) + "\n");
}
// ─── TOOLS — what the agent can call ───
let permitsCache: any[] | null = null;
async function loadPermits(): Promise<any[]> {
if (permitsCache) return permitsCache;
if (!existsSync(PERMITS_RAW)) {
// Fetch from raw bucket via mc
const proc = Bun.spawn(["mc", "cp", "-q", "local/raw/chicago/permits_2026-04-25.json", PERMITS_RAW]);
await proc.exited;
}
permitsCache = JSON.parse(await readFile(PERMITS_RAW, "utf8"));
return permitsCache!;
}
async function tool_list_permits(args: { min_cost?: number; permit_type?: string }): Promise<string> {
const all = await loadPermits();
let filtered = all.filter(p => p.contact_1_name || p.contact_2_name);
if (args.min_cost) filtered = filtered.filter(p => Number(p.reported_cost ?? 0) >= args.min_cost!);
if (args.permit_type) filtered = filtered.filter(p => (p.permit_type ?? "").toLowerCase().includes(args.permit_type!.toLowerCase()));
filtered.sort((a, b) => Number(b.reported_cost ?? 0) - Number(a.reported_cost ?? 0));
const out = filtered.slice(0, 5).map(p =>
`- permit_id=${p.permit_} type=${p.permit_type} cost=$${Number(p.reported_cost ?? 0).toLocaleString()} contractor=${p.contact_1_name ?? "?"}`
).join("\n");
return `Top ${Math.min(5, filtered.length)} of ${filtered.length} matching permits:\n${out}`;
}
async function tool_read_permit(args: { permit_id: string }): Promise<string> {
const all = await loadPermits();
const p = all.find(x => x.permit_ === args.permit_id);
if (!p) return `permit ${args.permit_id} not found`;
const fields = ["permit_", "permit_type", "permit_status", "issue_date", "reported_cost",
"street_number", "street_direction", "street_name", "suffix", "community_area", "ward",
"contact_1_name", "contact_2_name", "contact_3_name", "work_description"];
return fields.map(f => `${f}: ${p[f] ?? ""}`).join("\n");
}
async function tool_query_matrix(args: { query: string; top_k?: number }): Promise<string> {
const k = args.top_k ?? 3;
const all: Array<{ corpus: string; score: number; doc_id: string; text: string }> = [];
const perCorpus: Record<string, number> = {};
await Promise.all(CORPORA.map(async (corpus) => {
try {
const r = await fetch(`${GATEWAY}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: corpus, query: args.query, top_k: k }),
signal: AbortSignal.timeout(10000),
});
if (!r.ok) { perCorpus[corpus] = -1; return; }
const data: any = await r.json();
const results = data.results ?? [];
perCorpus[corpus] = results.length;
for (const h of results) {
all.push({ corpus, score: Number(h.score ?? 0), doc_id: String(h.doc_id ?? "?"), text: String(h.chunk_text ?? "").slice(0, 300) });
}
} catch { perCorpus[corpus] = -1; }
}));
all.sort((a, b) => b.score - a.score);
const top = all.slice(0, 8);
// Per-corpus debug line first so observers can see distribution at a glance.
const dist = Object.entries(perCorpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" ");
if (top.length === 0) return `no matrix evidence for: ${args.query}\n(per-corpus: ${dist})`;
return `(per-corpus: ${dist})\n` + top.map((h, i) => `[${i + 1}] ${h.corpus} score=${h.score.toFixed(2)} doc=${h.doc_id}\n ${h.text.replace(/\s+/g, " ").trim()}`).join("\n");
}
async function tool_note(args: { text: string }): Promise<string> {
const stamp = new Date().toISOString().slice(11, 19);
await appendFile(SCRATCHPAD_PATH, `[${stamp}] ${args.text}\n`);
return `noted (${args.text.length} chars)`;
}
async function tool_read_scratchpad(): Promise<string> {
if (!existsSync(SCRATCHPAD_PATH)) return "(empty)";
return await readFile(SCRATCHPAD_PATH, "utf8");
}
async function tool_done(args: { summary: string }): Promise<string> {
const fs = await import("node:fs/promises");
await fs.writeFile(FINAL_PATH, args.summary);
return `done; final saved to ${FINAL_PATH} (${args.summary.length} chars)`;
}
const TOOLS: Record<string, (args: any) => Promise<string>> = {
list_permits: tool_list_permits,
read_permit: tool_read_permit,
query_matrix: tool_query_matrix,
note: tool_note,
read_scratchpad: tool_read_scratchpad,
done: tool_done,
};
const TOOL_SCHEMA = `Available tools (call by emitting JSON like: {"tool": "name", "args": {...}}):
- list_permits(min_cost?: number, permit_type?: string) top 5 by cost
- read_permit(permit_id: string) full permit fields
- query_matrix(query: string, top_k?: number) search KB
- note(text: string) append to scratchpad
- read_scratchpad() read your scratchpad
- done(summary: string) finish; pass final markdown analysis`;
// ─── AGENT LOOP ───
async function callAgent(messages: Array<{role: string; content: string}>): Promise<string> {
// think:false disables hidden reasoning so all generated tokens go to
// visible response. qwen3.5:latest defaults to thinking and silently
// burns the token budget otherwise.
const r = await fetch(`${SIDECAR}/generate`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
model: AGENT_MODEL,
prompt: messages.map(m => `${m.role.toUpperCase()}:\n${m.content}`).join("\n\n") + "\n\nASSISTANT:\n",
stream: false,
max_tokens: 1500,
think: false,
}),
signal: AbortSignal.timeout(180000),
});
if (!r.ok) throw new Error(`agent ${r.status}: ${(await r.text()).slice(0, 200)}`);
const j: any = await r.json();
return String(j.text ?? j.response ?? "").trim();
}
function extractToolCall(response: string): { tool: string; args: any } | null {
// Look for JSON block in the response
const fenced = response.match(/```(?:json)?\s*(\{[\s\S]+?\})\s*```/);
const candidate = fenced ? fenced[1] : (response.match(/\{[\s\S]*\}/)?.[0] ?? null);
if (!candidate) return null;
try {
const parsed = JSON.parse(candidate);
if (parsed.tool && typeof parsed.tool === "string") return { tool: parsed.tool, args: parsed.args ?? {} };
} catch { /* not JSON */ }
return null;
}
async function main() {
log(`session=${SESSION_ID} model=${AGENT_MODEL} max_steps=${MAX_STEPS}`);
// Reset workspace files for this session
for (const p of [SCRATCHPAD_PATH, TRACE_PATH, FINAL_PATH]) {
try { await Bun.write(p, ""); } catch { /* ignore */ }
}
const prd = await readFile(PRD_PATH, "utf8");
log(`loaded PRD (${prd.length} chars)`);
await emitObserverEvent({ event_kind: "agent_start", model: AGENT_MODEL });
// Pre-flight: pull prior accepted pathway traces for this task class
// and surface them as a "PROVEN APPROACHES" preamble. This closes the
// matrix loop — successful past runs now actively help the next agent.
let priorPlaybooks = "";
try {
const stateFile = Bun.file("/home/profit/lakehouse/data/_pathway_memory/state.json");
if (await stateFile.exists()) {
const state: any = JSON.parse(await stateFile.text());
const matched: any[] = [];
for (const traces of Object.values(state.pathways ?? {}) as any[][]) {
for (const t of traces) {
if (t.task_class === "chicago_permit_analysis" && t.final_verdict === "accepted" && !t.retired) {
matched.push(t);
}
}
}
matched.sort((a, b) => (b.created_at ?? "").localeCompare(a.created_at ?? ""));
if (matched.length > 0) {
const top = matched.slice(0, 2);
priorPlaybooks = "\n\n═══ 📖 PROVEN APPROACHES FROM PRIOR ACCEPTED RUNS ═══\n" +
top.map((t, i) =>
`[${i + 1}] pathway=${t.pathway_id?.slice(0, 12)} previously succeeded on ${t.file_path}\n` +
`Approach excerpt:\n${(t.reducer_summary ?? "").slice(0, 800)}`
).join("\n\n") +
"\n═══ end proven approaches ═══\n\nUse these as REFERENCE for what worked. Don't copy verbatim, but follow the same workflow shape (plan → list → read → matrix → analyze → done).\n";
log(`📖 found ${matched.length} prior accepted pathway(s) for chicago_permit_analysis — top ${top.length} prepended to agent context`);
} else {
log(`📖 no prior accepted pathways for chicago_permit_analysis (this is the first run)`);
}
}
} catch (e: any) {
log(`📖 pathway preamble skipped: ${e.message}`);
}
const systemMsg = `You are an autonomous agent. Read the PRD below and follow its instructions exactly.
${TOOL_SCHEMA}
To call a tool, respond with ONLY a JSON object: {"tool": "<name>", "args": {...}}
No markdown, no explanation around it. The harness will execute the tool and give you the result, then ask you what to do next.
When you are completely finished, call done(summary="<your final markdown>").`;
const messages: Array<{role: string; content: string}> = [
{ role: "system", content: systemMsg },
{ role: "user", content: `PRD:\n\n${prd}${priorPlaybooks}\n\nNow respond. Remember: PLAN first via note() before executing.` },
];
// Iter 3 surfaced: when the matrix returns real evidence, the agent
// gets analysis paralysis — keeps calling note() to refine instead of
// producing the final output. Guard: after MAX_CONSECUTIVE_NOTES
// note() calls in a row, harness injects a hard-stop user message
// telling the agent it MUST call done() next.
const MAX_CONSECUTIVE_NOTES = Number(process.env.AGENT_MAX_CONSECUTIVE_NOTES ?? 2);
let consecutiveNotes = 0;
let isDone = false;
for (let step = 1; step <= MAX_STEPS && !isDone; step++) {
log(`step ${step}/${MAX_STEPS} — calling agent...`);
const t0 = Date.now();
let response: string;
try {
response = await callAgent(messages);
} catch (e: any) {
log(` ✗ agent error: ${e.message}`);
await trace({ step, kind: "error", error: e.message });
await emitObserverEvent({ event_kind: "agent_error", step, error: e.message });
break;
}
const ms = Date.now() - t0;
log(` · agent responded ${response.length} chars in ${ms}ms`);
await trace({ step, kind: "agent_response", chars: response.length, latency_ms: ms, response: response.slice(0, 4000) });
const call = extractToolCall(response);
if (!call) {
log(` ⚠ no tool call extracted from response — agent may be confused`);
await trace({ step, kind: "no_tool_call", preview: response.slice(0, 500) });
await emitObserverEvent({ event_kind: "agent_no_tool", step, preview: response.slice(0, 200) });
// Push the agent: tell it to call a tool
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Your last response did not contain a valid tool call. Respond with ONLY a JSON object like {"tool": "note", "args": {"text": "..."}}. No prose around it.` });
continue;
}
log(` → tool: ${call.tool}(${JSON.stringify(call.args).slice(0, 200)})`);
if (!TOOLS[call.tool]) {
const err = `unknown tool: ${call.tool}`;
log(`${err}`);
await trace({ step, kind: "tool_unknown", tool: call.tool });
await emitObserverEvent({ event_kind: "tool_unknown", step, tool: call.tool });
messages.push({ role: "assistant", content: response });
messages.push({ role: "user", content: `Tool "${call.tool}" does not exist. Available: ${Object.keys(TOOLS).join(", ")}. Try again.` });
continue;
}
const resStart = Date.now();
let result: string;
try {
result = await TOOLS[call.tool](call.args);
} catch (e: any) {
result = `TOOL ERROR: ${e.message}`;
}
const resMs = Date.now() - resStart;
log(`${result.slice(0, 200)}${result.length > 200 ? "..." : ""} (${resMs}ms)`);
await trace({ step, kind: "tool_call", tool: call.tool, args: call.args, result: result.slice(0, 4000), latency_ms: resMs });
await emitObserverEvent({ event_kind: "tool_call", step, tool: call.tool, result_chars: result.length });
if (call.tool === "done") {
isDone = true;
log(` ✓ DONE`);
await emitObserverEvent({ event_kind: "agent_done", step });
break;
}
// Track consecutive note() calls; force done() if too many in a row.
if (call.tool === "note") consecutiveNotes++;
else consecutiveNotes = 0;
messages.push({ role: "assistant", content: response });
if (consecutiveNotes >= MAX_CONSECUTIVE_NOTES) {
log(`${consecutiveNotes} consecutive note() calls — forcing done() next`);
await emitObserverEvent({ event_kind: "force_done_pressure", step, consecutive_notes: consecutiveNotes });
messages.push({ role: "user", content: `Tool result:\n${result}\n\nYou have called note() ${consecutiveNotes} times in a row without producing output. STOP NOTING. Call done(summary="<your final markdown>") NOW with whatever analysis you have. Do not call note() again. Respond with ONLY: {"tool": "done", "args": {"summary": "..."}}` });
consecutiveNotes = 0; // reset so we only push once per streak
} else {
messages.push({ role: "user", content: `Tool result:\n${result}\n\nWhat next?` });
}
}
if (!isDone) {
log(`✗ agent did not complete within ${MAX_STEPS} steps`);
await emitObserverEvent({ event_kind: "agent_max_steps", final_step: MAX_STEPS });
// Mem0: any partial trace this session inserted should be retired
// so future agents don't get a broken playbook in their preamble.
// We don't have a trace_uid for this session yet (insert happens
// on done); but if any prior trace has the same workflow shape as
// this session's tool sequence, retire it.
// For now, just log — actual retirement would happen if seal had run.
log(` ⚠ no playbook seal will be performed for failed run`);
}
log(`session ${SESSION_ID} ended. Trace: ${TRACE_PATH}`);
if (existsSync(FINAL_PATH)) log(`Final output: ${FINAL_PATH}`);
}
mkdirSync("/home/profit/lakehouse/tests/agent_test", { recursive: true });
main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); });

View File

@ -0,0 +1,214 @@
#!/usr/bin/env bun
// Autonomous scrum loop — wraps scrum_master_pipeline.ts + scrum_applier.ts
// in a goal-driven retry loop. Observer is POSTed an iteration summary at
// every boundary so it can build meta-commentary outside the loop's epistemic
// scope.
//
// Usage:
// LOOP_TARGETS="crates/a/src/x.rs,crates/b/src/y.rs" \
// LOOP_MAX_ITERS=5 \
// LOOP_PUSH=1 \
// bun run tests/real-world/autonomous_loop.ts
//
// Stop conditions: max_iters reached OR 2 consecutive iters with 0 commits.
import { spawn } from "node:child_process";
import { appendFile, readFile } from "node:fs/promises";
import { existsSync } from "node:fs";
const REPO = "/home/profit/lakehouse";
const OBSERVER = process.env.LOOP_OBSERVER ?? "http://localhost:3800";
const BRANCH = process.env.LOOP_BRANCH ?? "scrum/auto-apply-19814";
const MAX_ITERS = Number(process.env.LOOP_MAX_ITERS ?? 3);
const PUSH = process.env.LOOP_PUSH === "1";
const MIN_CONF = process.env.LOOP_MIN_CONF ?? "85";
// Optional override — when unset, let scrum_applier.ts use ITS default
// (currently x-ai/grok-4.1-fast on openrouter). The prior hardcoded
// qwen3-coder:480b default was clobbering the applier's own default
// and forcing every iter to hit the throttled ollama_cloud account.
const APPLIER_MODEL = process.env.LOOP_APPLIER_MODEL;
const APPLIER_PROVIDER = process.env.LOOP_APPLIER_PROVIDER;
const TARGETS = (process.env.LOOP_TARGETS ?? "crates/queryd/src/service.rs,crates/gateway/src/main.rs,crates/gateway/src/v1/mod.rs")
.split(",").map(s => s.trim()).filter(Boolean);
const FORENSIC = process.env.LH_SCRUM_FORENSIC ?? `${REPO}/docs/SCRUM_FORENSIC_PROMPT.md`;
const PROPOSAL = process.env.LH_SCRUM_PROPOSAL ?? `${REPO}/docs/SCRUM_FIX_WAVE.md`;
const LOOP_ID = `loop_${Date.now().toString(36)}`;
const JOURNAL = `${REPO}/data/_kb/autonomous_loops.jsonl`;
interface IterResult {
iter: number;
scrum_reviews_added: number;
applier_outcomes: Record<string, number>;
commits_landed: number;
commit_shas: string[];
build_status: "green" | "red" | "unknown";
duration_ms: number;
}
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[loop ${LOOP_ID} ${ts}] ${msg}`);
}
function runCmd(cmd: string, args: string[], env: Record<string, string> = {}): Promise<{ code: number; stdout: string; stderr: string }> {
return new Promise((resolve) => {
const child = spawn(cmd, args, { cwd: REPO, env: { ...process.env, ...env } });
let stdout = "", stderr = "";
child.stdout.on("data", (d) => { stdout += d; process.stdout.write(d); });
child.stderr.on("data", (d) => { stderr += d; process.stderr.write(d); });
child.on("close", (code) => resolve({ code: code ?? -1, stdout, stderr }));
});
}
async function countLines(path: string): Promise<number> {
if (!existsSync(path)) return 0;
const text = await readFile(path, "utf8");
return text.split("\n").filter(Boolean).length;
}
async function gitHeadSha(): Promise<string> {
const r = await runCmd("git", ["rev-parse", "HEAD"]);
return r.stdout.trim();
}
async function commitsSince(baseSha: string): Promise<string[]> {
const r = await runCmd("git", ["log", "--oneline", `${baseSha}..HEAD`]);
return r.stdout.trim().split("\n").filter(Boolean);
}
async function cargoCheckGreen(): Promise<boolean> {
log("cargo check --workspace …");
const r = await runCmd("cargo", ["check", "--workspace", "--quiet"]);
return r.code === 0;
}
async function postObserver(payload: object) {
try {
const r = await fetch(`${OBSERVER}/event`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(5000),
});
if (!r.ok) log(`observer POST returned ${r.status}`);
} catch (e: any) {
log(`observer POST failed: ${e.message}`);
}
}
async function runIter(iter: number, baseSha: string): Promise<IterResult> {
const t0 = Date.now();
log(`══ iter ${iter} start (base ${baseSha.slice(0, 8)}) targets=${TARGETS.length}`);
const reviewsBefore = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyBefore = await countLines(`${REPO}/data/_kb/auto_apply.jsonl`);
log(`scrum_master_pipeline.ts → ${TARGETS.length} files`);
await runCmd("bun", ["run", "tests/real-world/scrum_master_pipeline.ts"], {
LH_SCRUM_FILES: TARGETS.join(","),
LH_SCRUM_FORENSIC: FORENSIC,
LH_SCRUM_PROPOSAL: PROPOSAL,
});
log(`scrum_applier.ts COMMIT=1 MIN_CONF=${MIN_CONF} files=${TARGETS.length}`);
// Only forward model/provider when explicitly overridden — otherwise
// let scrum_applier.ts use its own defaults (Grok 4.1 fast on openrouter).
const applierEnv: Record<string, string> = {
LH_APPLIER_COMMIT: "1",
LH_APPLIER_MIN_CONF: MIN_CONF,
LH_APPLIER_MAX_FILES: String(TARGETS.length),
LH_APPLIER_BRANCH: BRANCH,
// Constrain applier to THIS iter's targets so it patches what we
// just reviewed instead of the highest-confidence file from history.
LH_APPLIER_FILES: TARGETS.join(","),
};
if (APPLIER_MODEL) applierEnv.LH_APPLIER_MODEL = APPLIER_MODEL;
if (APPLIER_PROVIDER) applierEnv.LH_APPLIER_PROVIDER = APPLIER_PROVIDER;
await runCmd("bun", ["run", "tests/real-world/scrum_applier.ts"], applierEnv);
const reviewsAfter = await countLines(`${REPO}/data/_kb/scrum_reviews.jsonl`);
const applyAfterText = existsSync(`${REPO}/data/_kb/auto_apply.jsonl`)
? await readFile(`${REPO}/data/_kb/auto_apply.jsonl`, "utf8")
: "";
const applyRows = applyAfterText.split("\n").filter(Boolean).slice(applyBefore);
const outcomes: Record<string, number> = {};
for (const line of applyRows) {
try {
const o = JSON.parse(line);
outcomes[o.action ?? "?"] = (outcomes[o.action ?? "?"] ?? 0) + 1;
} catch { /* skip malformed */ }
}
const commitShas = await commitsSince(baseSha);
const buildStatus = commitShas.length > 0 ? (await cargoCheckGreen() ? "green" : "red") : "unknown";
const result: IterResult = {
iter,
scrum_reviews_added: reviewsAfter - reviewsBefore,
applier_outcomes: outcomes,
commits_landed: commitShas.length,
commit_shas: commitShas.map(s => s.split(" ")[0]),
build_status: buildStatus,
duration_ms: Date.now() - t0,
};
log(`iter ${iter} done — reviews+${result.scrum_reviews_added} commits=${result.commits_landed} build=${buildStatus} (${(result.duration_ms / 1000).toFixed(1)}s)`);
await postObserver({
source: "autonomous_loop",
loop_id: LOOP_ID,
event_kind: "iteration_complete",
iter,
targets: TARGETS,
success: buildStatus !== "red",
scrum_reviews_added: result.scrum_reviews_added,
applier_outcomes: result.applier_outcomes,
commits_landed: result.commits_landed,
commit_shas: result.commit_shas,
build_status: buildStatus,
duration_ms: result.duration_ms,
ts: new Date().toISOString(),
});
await appendFile(JOURNAL, JSON.stringify({ loop_id: LOOP_ID, ...result, ts: new Date().toISOString() }) + "\n");
return result;
}
async function main() {
log(`autonomous loop starting · branch=${BRANCH} max_iters=${MAX_ITERS} push=${PUSH}`);
log(`targets: ${TARGETS.join(", ")}`);
const branchR = await runCmd("git", ["branch", "--show-current"]);
if (branchR.stdout.trim() !== BRANCH) {
log(`ERROR: on branch ${branchR.stdout.trim()}, expected ${BRANCH}. Refusing to run.`);
process.exit(1);
}
let consecutiveZero = 0;
for (let iter = 1; iter <= MAX_ITERS; iter++) {
const baseSha = await gitHeadSha();
const result = await runIter(iter, baseSha);
if (PUSH && result.commits_landed > 0) {
log(`git push origin ${BRANCH}`);
const pushR = await runCmd("git", ["push", "origin", BRANCH]);
if (pushR.code !== 0) log(`push failed (continuing): ${pushR.stderr.slice(0, 200)}`);
}
consecutiveZero = result.commits_landed === 0 ? consecutiveZero + 1 : 0;
if (consecutiveZero >= 2) {
log(`STOP: 2 consecutive iters with 0 commits. Loop converged or stuck.`);
break;
}
}
log(`loop ${LOOP_ID} complete. Journal: ${JOURNAL}`);
}
main().catch((e) => {
log(`FATAL: ${e.message}`);
process.exit(1);
});

View File

@ -35,6 +35,12 @@ const AUDIT_LOG = `${REPO}/data/_kb/auto_apply.jsonl`;
const MIN_CONF = Number(process.env.LH_APPLIER_MIN_CONF ?? 90); const MIN_CONF = Number(process.env.LH_APPLIER_MIN_CONF ?? 90);
const MAX_FILES = Number(process.env.LH_APPLIER_MAX_FILES ?? 5); const MAX_FILES = Number(process.env.LH_APPLIER_MAX_FILES ?? 5);
const COMMIT = process.env.LH_APPLIER_COMMIT === "1"; const COMMIT = process.env.LH_APPLIER_COMMIT === "1";
// LH_APPLIER_FILES — comma-separated repo-relative paths. When set,
// constrains eligible reviews to ONLY those files. Used by the autonomous
// loop so the applier patches what scrum just reviewed in this iter,
// instead of pulling the highest-confidence file from global review history.
const TARGET_FILES = (process.env.LH_APPLIER_FILES ?? "")
.split(",").map(s => s.trim()).filter(Boolean);
// Default patch-emitter model — qwen3-coder:480b is the coding specialist // Default patch-emitter model — qwen3-coder:480b is the coding specialist
// in the scrum ladder (rung 2). Swapped in from kimi-k2:1t after 2026-04-24 // in the scrum ladder (rung 2). Swapped in from kimi-k2:1t after 2026-04-24
// data showed kimi-k2:1t produces architectural patches that cascade across // data showed kimi-k2:1t produces architectural patches that cascade across
@ -42,7 +48,13 @@ const COMMIT = process.env.LH_APPLIER_COMMIT === "1";
// for targeted code changes and tends to stay within the mechanical-patch // for targeted code changes and tends to stay within the mechanical-patch
// constraint the prompt asks for. LLM Team's /api/run?mode=patch would be // constraint the prompt asks for. LLM Team's /api/run?mode=patch would be
// the ideal choice but that mode isn't registered in llm_team_ui.py yet. // the ideal choice but that mode isn't registered in llm_team_ui.py yet.
const MODEL = process.env.LH_APPLIER_MODEL ?? "qwen3-coder:480b"; // Default patch emitter swapped to OpenRouter Grok 4.1 fast (2026-04-25)
// after observing the prior default (ollama_cloud::qwen3-coder:480b) sit
// at 429 throttle and never produce patches. Grok 4.1 fast: $0.20/$0.50
// per M, 2M ctx, proven to emit precise structured patches in observer
// hand-review tests. Override with LH_APPLIER_MODEL + LH_APPLIER_PROVIDER.
const MODEL = process.env.LH_APPLIER_MODEL ?? "x-ai/grok-4.1-fast";
const PROVIDER = (process.env.LH_APPLIER_PROVIDER ?? "openrouter") as "ollama_cloud" | "openrouter" | "ollama";
const BRANCH = process.env.LH_APPLIER_BRANCH ?? `scrum/auto-apply-${Date.now().toString(36)}`; const BRANCH = process.env.LH_APPLIER_BRANCH ?? `scrum/auto-apply-${Date.now().toString(36)}`;
// Deny-list — anything whose path starts with one of these is skipped // Deny-list — anything whose path starts with one of these is skipped
@ -200,7 +212,7 @@ ${source.slice(0, 14000)}
Emit ONLY the JSON object.`; Emit ONLY the JSON object.`;
const r = await chat({ provider: "ollama_cloud", model: MODEL, prompt, max_tokens: 2500 }); const r = await chat({ provider: PROVIDER, model: MODEL, prompt, max_tokens: 2500 });
if (r.error || !r.content) return []; if (r.error || !r.content) return [];
// Strip markdown fences if model wrapped the JSON. // Strip markdown fences if model wrapped the JSON.
@ -334,10 +346,14 @@ async function main() {
log(`loaded ${reviews.size} latest reviews`); log(`loaded ${reviews.size} latest reviews`);
const eligible = [...reviews.values()].filter(r => const eligible = [...reviews.values()].filter(r =>
passesConfidenceGate(r) && passesDenyList(r.file) passesConfidenceGate(r) && passesDenyList(r.file) &&
(TARGET_FILES.length === 0 || TARGET_FILES.includes(r.file))
).sort((a, b) => (b.confidence_avg ?? 0) - (a.confidence_avg ?? 0)); ).sort((a, b) => (b.confidence_avg ?? 0) - (a.confidence_avg ?? 0));
log(`${eligible.length} pass confidence gate + deny-list`); if (TARGET_FILES.length > 0) {
log(`LH_APPLIER_FILES set — constrained to ${TARGET_FILES.length} target file(s): ${TARGET_FILES.join(", ")}`);
}
log(`${eligible.length} pass confidence gate + deny-list${TARGET_FILES.length > 0 ? " + target filter" : ""}`);
log(`taking top ${Math.min(MAX_FILES, eligible.length)} by confidence`); log(`taking top ${Math.min(MAX_FILES, eligible.length)} by confidence`);
// Establish pre-run warning baseline so post-patch cargo check can // Establish pre-run warning baseline so post-patch cargo check can

File diff suppressed because it is too large Load Diff