Compare commits

...

4 Commits

Author SHA1 Message Date
root
a663698571 Item 3 — geo-filtered playbook boost; diagnostic logging
ROOT CAUSE (found via instrumentation, not hunch):
After a 20-scenario corpus batch, only 6/40 successful (role, city)
combos ever triggered playbook_memory citations on subsequent runs.
Added `playbook_boost:` tracing::info! line in vectord::service to log
boost map size vs candidate pool vs match count. One query revealed:

  boosts=170 sources=50 parsed=50 matched=0

170 endorsed workers came back from compute_boost_for — but zero were
in the 50-candidate Toledo pool. The boost map was pulling globally-
ranked semantic neighbors (top-100 playbooks across ALL cities),
dominated by Kansas City / Chicago / Detroit forklift playbooks the
Toledo SQL filter would never admit. The mechanism was correct at the
per-playbook level; the problem was pool intersection.

FIX (surgical, not cap-tuning):
- playbook_memory::compute_boost_for_filtered(): accepts optional
  (city, state) filter. When set, skips playbooks from other geos
  BEFORE cosine-ranking, so top-k is within the target city.
- Backwards-compatible: compute_boost_for() calls the filtered variant
  with None — existing callers unchanged.
- service::hybrid_search(): extracts target (city, state) from the
  executor's SQL filter via a small parser (extract_target_geo),
  passes to compute_boost_for_filtered.

VERIFIED:
  Before fix: boosts=170 sources=50 parsed=50 matched=0   (0% hit)
  After fix:  boosts=36  sources=50 parsed=50 matched=11  (22% hit)
Top-k=10 now has 7/10 boosted workers with 2-3 citations each.
Boost values 0.075-0.113 on cosine scores 0.67-0.74 — meaningful
reorder without saturation.

scripts/kb_measure.py:
Aggregator that reads data/_kb/*.jsonl and playbooks/*/results.json,
reports fill rate, citation density, recommender confidence trend,
and zero-citation-ok combos (item 3 target signal). Used to measure
before/after on bigger batches.

Diagnostic logging stays — the class of "boosts computed but not
matched" bug can recur if the SQL filter format ever drifts, and
without the counter it's invisible. Every hybrid_search with
use_playbook_memory=true now logs its boost stats.
2026-04-20 21:35:04 -05:00
root
330cb90f99 Lift k cap, drop ornamental reason field, scenario generator
ITEM 1 — k CAP + REASON FIELD
The hybrid_search default k was hard-coded to 10. For multi-fill events
(5× expansion, 4× emergency) that's pool=10 → propose 5-of-10, half
the candidates become the answer with no room for rejection. Executor
prompt now instructs k to scale with target_count: k = max(count*5, 20),
cap 80. Default helper bumped 10 → 20.

Fill.reason dropped from required to optional. Nothing downstream ever
consumed it — resolveWorkerIds, sealSale, retrospective all use
candidate_id and name. Models loved to write 100-150 char justifications
per fill; on 4+ fills that blew the JSON budget before the structure
closed. Test 1 run result after this change: FIRST EVER 5/5 on the
Riverfront Steel scenario, 13 total turns across 5 events. The event
that failed last run (emergency 4×Loader with truncated reason-field
continuation) now clears in 2 turns.

Progression:
  mistral baseline:                  0/5
  qwen3.5 + continuation + think:false: 4/5
  qwen3.5 + k=20 + no-reason:        5/5 ✓

ITEM 2 — SCENARIO GENERATOR (NOT YET TESTED E2E)
tests/multi-agent/gen_scenarios.ts emits N deterministic ScenarioSpecs
with varied clients (15 companies), cities (20 Midwest cities known
to exist in workers_500k), role mixes (14 industrial staffing roles,
weighted realistic), and event sequences. Each gets a unique sig_hash
so the KB populates with distinct neighbor signatures.

scripts/run_kb_batch.sh runs all generated specs sequentially against
scenario.ts, logs per-scenario outcomes, and reports KB state at the
end. Each run takes ~2-4min; 20-30 scenarios = 1-2hr unattended.

Next: test the generator+batch on a small N (3-5) to verify KB
populates correctly and pathway recommendations start getting neighbor
signal instead of cold-starts. Then item 3 (Rust re-weighting of
hybrid_search by playbook_memory success).
2026-04-20 20:31:34 -05:00
root
9c1400d738 Phase 22 — Internal Knowledge Library (KB)
Meta-layer over Phase 19 playbook_memory. Phase 19 answers "which
WORKERS worked for this event"; KB answers "which CONFIG worked for
this playbook signature" — model choice, budget hints, pathway notes,
error corrections.

tests/multi-agent/kb.ts:
- computeSignature(): stable sha256 hash of the (kind, role, count,
  city, state) tuple sequence. Same scenario shape → same sig.
- indexRun(): extracts sig, embeds spec digest via sidecar, appends
  outcome record, upserts signature to data/_kb/signatures.jsonl.
- findNeighbors(): cosine-ranks the k most-similar signatures from
  prior runs for a target spec.
- detectErrorCorrections(): scans outcomes for same-sig fail→succeed
  pairs, diffs the model set, logs to error_corrections.jsonl.
- recommendFor(): feeds target digest + k-NN neighbors + recent
  corrections to the overview model, gets back a structured JSON
  recommendation (top_models, budget_hints, pathway_notes), appends
  to pathway_recommendations.jsonl. JSON-shape constrained so the
  executor can inherit it mechanically.
- loadRecommendation(): at scenario start, pulls newest rec matching
  this sig (or nearest).

scenario.ts:
- Reads KB recommendation at startup (alongside prior lessons).
- Injects pathway_notes into guidanceFor() executor context.
- After retrospective, indexes the run + synthesizes next rec.

Cold-start behavior: first run with no history writes a low-confidence
"no prior data" rec so the signal that something was attempted is
captured. Second run gets "low confidence, 0 neighbors" until a third
distinct sig gives the embedder something to compare against — hence
the upcoming scenario generator.

VERIFIED:
- data/_kb/ populated after one scenario run: 1 outcome (sig=4674…,
  4/5 ok, 16 turns total), 1 signature, 2 recs (cold + post-run).
- Recommendation JSON-parsed cleanly from gpt-oss:20b overview model.

PRD Phase 22 added with file layout, cycle description, and the
rationale for file-based MVP → Rust port progression that matches
how Phase 21 primitives shipped.

What's NOT here yet (batched follow-ups per J's request, tested
between each):
- Lift the k=10 hybrid_search cap to adaptive k=max(count*5, 20)
- Scenario generator to bulk-populate KB with varied signatures
- Rust re-weighting: push playbook_memory success signal INTO
  hybrid_search scoring, not just post-hoc boost
2026-04-20 20:27:12 -05:00
root
0c4868c191 qwen3.5 executor + continuation primitive + think:false
Three coupled fixes that together turned the Riverfront Steel scenario
from 0/5 (mistral) to 4/5 (qwen3.5) with T3 flagging real staffing
concerns rather than linter advice.

MODEL SWAP
- Executor: mistral → qwen3.5:latest (9.7B, 262K ctx, thinking).
  mistral's decoder emitted malformed JSON on complex SQL filters
  regardless of prompt; J called it — stop using mistral.
- Reviewer: qwen2.5 → qwen3:latest (40K ctx)
- Applied to scenario.ts, orchestrator.ts, network_proving.ts,
  run_e2e_rated.ts

CONTINUATION PRIMITIVE (agent.ts)
- generateContinuable(): empty-response → geometric backoff retry;
  truncated-JSON → continue from partial as scratchpad; bounded by
  budget cap + max_continuations. No more "bump max_tokens until it
  stops truncating" tourniquet.
- generateTreeSplit(): map-reduce for oversized input corpora with
  running scratchpad digest, reduce pass for final synthesis.
- Empty text no longer throws — it's a signal to continuable that
  thinking ate the budget.

think:false FOR HOT PATH
- qwen3.5 burned ~650 tokens of hidden thinking for trivial JSON
  emission. For executor/reviewer/draft: think:false. For T3/T4/T5
  overseers: thinking stays on (that's the point).
- Sidecar generate endpoint accepts `think` bool, passes through to
  Ollama's /api/generate.

VERIFIED OUTCOMES
Riverfront Steel 2026-04-21, qwen3.5+continuable+think:false:
  08:00 baseline_fill  3/3  4 turns
  10:30 recurring      2/2  3 turns (1 playbook citation)
  12:15 expansion      0/5  drift-aborted (5-fill orchestration
                            problem, separate work)
  14:00 emergency      4/4  3 turns (1 citation)
  15:45 misplacement   1/1  3 turns
  → T3 caught Patrick Ross double-booking across events
  → T3 flagged forklift cert drift on the event that failed
  → Cross-day lesson proposed "maintain buffer of ≥3 emergency
    candidates, pre-fetch certs for expansion, booking system
    cross-check" — real staffing advice, not generic linter output

PRD PHASE 21 rewritten to reflect the actual primitive shape (two-
call map-reduce with scratchpad glue) instead of the tourniquet
approach originally documented. Rust port queued for next sprint.

scripts/ab_t3_test.sh: A/B harness that chains B→C→D runs and emits
tests/multi-agent/playbooks/ab_scorecard.json.
2026-04-20 20:19:02 -05:00
15 changed files with 1468 additions and 64 deletions

View File

@ -22,9 +22,9 @@
"t1_hot": {
"purpose": "Per tool call — SQL generation, hybrid_search, sql(). Runs 50-200 times per scenario. Latency-sensitive.",
"kind": "local_fast",
"primary": { "model": "mistral:latest", "provider": "ollama_local", "context_window": 32768 },
"primary": { "model": "qwen3.5:latest", "provider": "ollama_local", "context_window": 262144 },
"fallback": { "model": "qwen2.5:latest", "provider": "ollama_local", "context_window": 32768 },
"max_tokens": 800,
"max_tokens": 1000,
"temperature": 0.3,
"never_route_cloud": true,
"context_budget": {
@ -34,14 +34,14 @@
"safety_margin": 2000,
"overflow_policy": "summarize_oldest_tool_results_via_t3"
},
"rationale": "Mistral produces valid JSON reliably. Qwen2.5 is the consensus reviewer. Known flakiness on 5-fill + misplacement events — do NOT mask by upgrading; route to T3 for post-hoc review instead."
"rationale": "qwen3.5:latest is a 9.7B thinking model with 262K context that emits clean JSON (verified 2026-04-21 after mistral A/B wipeout). Thinking budget requires max_tokens ≥ 400; we hold 1000 to keep propose_done payloads intact. qwen2.5 stays as fallback."
},
"t2_review": {
"purpose": "Per step consensus — executor ↔ reviewer loop critique. 5-14 calls per event.",
"kind": "local_balanced",
"primary": { "model": "qwen2.5:latest", "provider": "ollama_local", "context_window": 32768 },
"fallback": { "model": "qwen3:latest", "provider": "ollama_local", "context_window": 40960 },
"max_tokens": 600,
"primary": { "model": "qwen3:latest", "provider": "ollama_local", "context_window": 40960 },
"fallback": { "model": "qwen2.5:latest", "provider": "ollama_local", "context_window": 32768 },
"max_tokens": 800,
"temperature": 0.3,
"never_route_cloud": true,
"context_budget": {
@ -72,7 +72,8 @@
"t4_strategic": {
"purpose": "Daily playbook board re-ranking, weekly gap audit, pattern discovery across accumulated playbooks. 1-10 calls per day.",
"kind": "thinking_cloud_large",
"primary": { "model": "qwen3.5:397b", "provider": "ollama_cloud", "context_window": 131072 },
"primary": { "model": "kimi-k2.6", "provider": "ollama_cloud", "context_window": 200000 },
"secondary": { "model": "qwen3.5:397b", "provider": "ollama_cloud", "context_window": 131072 },
"fallback": { "model": "glm-4.7", "provider": "ollama_cloud", "context_window": 131072 },
"local_fallback": { "model": "gpt-oss:20b", "provider": "ollama_local", "context_window": 131072 },
"max_tokens": 2000,
@ -108,6 +109,14 @@
}
},
"embeddings": {
"_description": "Vector embedding models for hybrid search + playbook_memory similarity. Always local — embedding calls are too high-volume for cloud. qwen3-embedding is the new primary (2026-04-21); nomic-embed-text-v2-moe is the MoE alternative for corpus-scale embedding; original nomic-embed-text stays as legacy fallback while vectord indexes are being re-embedded.",
"primary": { "model": "qwen3-embedding", "provider": "ollama_local", "dim": 1024, "context_window": 32768 },
"alternate": { "model": "nomic-embed-text-v2-moe", "provider": "ollama_local", "dim": 768, "context_window": 2048, "notes": "MoE variant — better for long documents, slower per-call, roughly matches nomic-embed-text on short text." },
"legacy": { "model": "nomic-embed-text:latest", "provider": "ollama_local", "dim": 768, "context_window": 8192, "notes": "Used by existing vectord indexes. Do NOT switch live until indexes are rebuilt, or hybrid search recall will crater." },
"migration_plan": "Re-embed workers_500k (and all production indexes) with qwen3-embedding, keep the old index under a _v1 suffix for rollback, flip the profile pointer only after recall benchmarks match or exceed legacy. Touches vectord::agent + autotune."
},
"context_management": {
"_description": "Rule zero: NEVER call a model with more tokens than its context_window minus safety_margin. Every call goes through the budget checker first. If over budget → chunk, summarize, or escalate. This is the stability floor.",
"token_estimator": {

View File

@ -213,6 +213,28 @@ impl PlaybookMemory {
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
) -> HashMap<(String, String, String), BoostEntry> {
self.compute_boost_for_filtered(query_embedding, top_k_playbooks, base_weight, None).await
}
/// Same as `compute_boost_for` but only considers playbooks whose
/// (city, state) matches the caller's target. This is the honest
/// fix for the "boosts=170 matched=0" pathology: globally-ranked
/// semantic neighbors include playbooks from every city the query
/// could never reach via its SQL filter. When the caller knows the
/// target geo, restricting here collapses noise and raises the
/// endorsed-worker hit rate. Pass None for the original behavior.
///
/// 2026-04-21 — added after a corpus-density batch of 25 runs
/// showed only 6/40 successful (role, city) combos ever triggered
/// a citation on subsequent runs. Diagnostic logging proved the
/// boost map had 170 keys but the 50-candidate pool matched 0.
pub async fn compute_boost_for_filtered(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
target_geo: Option<(&str, &str)>,
) -> HashMap<(String, String, String), BoostEntry> {
let state = self.state.read().await;
let entries = state.entries.clone();
@ -225,8 +247,18 @@ impl PlaybookMemory {
drop(state);
// Brute-force cosine. Empty / missing embeddings just skip.
// When target_geo is set, pre-filter to matching playbooks BEFORE
// cosine sort — that way top-k is within the city, not across
// all cities.
let mut scored: Vec<(f32, &PlaybookEntry)> = entries
.iter()
.filter(|e| match (target_geo, &e.city, &e.state) {
(None, _, _) => true,
(Some((tc, ts)), Some(ec), Some(es)) => {
ec.eq_ignore_ascii_case(tc) && es.eq_ignore_ascii_case(ts)
}
_ => false,
})
.filter_map(|e| e.embedding.as_ref().map(|v| (cosine(query_embedding, v), e)))
.collect();
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));

View File

@ -803,22 +803,61 @@ async fn hybrid_search(
// set. Additive boost on the existing vector score, then re-sort.
if req.use_playbook_memory {
let boost_k = req.playbook_memory_k.unwrap_or(playbook_memory::DEFAULT_TOP_K_PLAYBOOKS);
// Extract target (city, state) from the SQL filter so
// compute_boost_for can skip playbooks from other cities that
// would never intersect the candidate pool. The executor's
// filter shape is stable: `... city = 'Toledo' AND state = 'OH' ...`.
// Case-insensitive match, tolerant of single quotes and spaces.
let target_geo = req.sql_filter.as_deref().and_then(extract_target_geo);
// We embedded the question as `qv` above — reuse it for the
// playbook similarity lookup so we don't double-pay Ollama.
let boosts = state.playbook_memory.compute_boost_for(&qv, boost_k, 0.5).await;
let boosts = state.playbook_memory
.compute_boost_for_filtered(
&qv,
boost_k,
0.5,
target_geo.as_ref().map(|(c, s)| (c.as_str(), s.as_str())),
)
.await;
// Diagnostics for Phase 19 boost pipeline. Logged so item 3
// investigation has ground truth:
// - boosts.len(): how many (city,state,name) keys surfaced for
// this query (0 = playbook_memory found nothing semantically
// similar to the question).
// - parsed: how many candidate chunks parsed cleanly into
// (name,city,state) via parse_worker_chunk.
// - matched: how many parsed keys matched an entry in boosts.
// 2026-04-21 — 20-scenario batch showed 34/40 ok combos never
// got a citation. These counters pin whether the gap is on the
// SIMILARITY side (boosts empty) or the MATCH side (parsed vs
// boosted keys mismatch — e.g. name format drift).
let mut parsed_count = 0usize;
let mut matched_count = 0usize;
for src in sources.iter_mut() {
// Parse "{Name} — {Role} in {City}, {State}. …" chunk. Being
// defensive: chunks from other datasets may not follow this
// exact shape, so absent fields just skip the boost.
if let Some((name, city, state)) = parse_worker_chunk(&src.chunk_text) {
parsed_count += 1;
let key = (city, state, name);
if let Some(entry) = boosts.get(&key) {
src.score += entry.boost;
src.playbook_boost = entry.boost;
src.playbook_citations = entry.citations.clone();
matched_count += 1;
}
}
}
tracing::info!(
"playbook_boost: boosts={} sources={} parsed={} matched={} target_geo={:?} (query='{}')",
boosts.len(),
sources.len(),
parsed_count,
matched_count,
target_geo,
req.question.chars().take(60).collect::<String>(),
);
// Re-rank: boosted scores can flip ordering.
sources.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
// Finally trim to the caller's requested top_k — we pulled fetch_k
@ -2059,6 +2098,54 @@ struct LanceRecallQuery {
/// "{Name} — {Role} in {City}, {State}. Skills: …".
/// Returns None if the chunk doesn't match the shape; callers simply
/// skip the boost for that hit.
/// Pull (city, state) out of a SQL filter that uses
/// `city = 'Toledo' AND state = 'OH'` style equality. Returns None if
/// either is missing — the caller keeps the original global boost map
/// behavior (no geo narrowing). Case-insensitive on the column name
/// so `CITY=` or `City =` also work.
fn extract_target_geo(sql_filter: &str) -> Option<(String, String)> {
fn grab_eq(src: &str, col: &str) -> Option<String> {
// Very small parser, resilient enough for the executor's
// filter shapes. Matches `col = 'value'` or `col='value'` with
// case-insensitive column name.
let lower = src.to_ascii_lowercase();
let col_lower = col.to_ascii_lowercase();
let mut search_from = 0usize;
while let Some(off) = lower[search_from..].find(&col_lower) {
let pos = search_from + off;
// Require word boundary before the column name so "city"
// inside "civilian_rank" doesn't false-match.
let prior_ok = pos == 0
|| !lower.as_bytes()[pos - 1].is_ascii_alphanumeric()
&& lower.as_bytes()[pos - 1] != b'_';
let after = pos + col_lower.len();
if !prior_ok || after >= src.len() {
search_from = pos + col_lower.len();
continue;
}
// Walk past whitespace, require '='.
let mut i = after;
while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; }
if i >= src.len() || src.as_bytes()[i] != b'=' { search_from = pos + col_lower.len(); continue; }
i += 1;
while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; }
// Value is single-quoted literal; extract until the next '.
if i >= src.len() || src.as_bytes()[i] != b'\'' { search_from = pos + col_lower.len(); continue; }
i += 1;
let start = i;
while i < src.len() && src.as_bytes()[i] != b'\'' { i += 1; }
if i > start {
return Some(src[start..i].to_string());
}
search_from = pos + col_lower.len();
}
None
}
let city = grab_eq(sql_filter, "city")?;
let state = grab_eq(sql_filter, "state")?;
Some((city, state))
}
fn parse_worker_chunk(chunk: &str) -> Option<(String, String, String)> {
// "Name — Role in City, ST. …" → split on "—" then " in " then ","
let (name_part, rest) = chunk.split_once('—')?;

View File

@ -392,26 +392,68 @@ Five-tier routing declared in `config/models.json`. Hot path (T1/T2) stays on lo
T3 checkpoints + cross-day lessons are wired. Lessons archive to `data/_playbook_lessons/` and load back at next scenario start as `prior_lessons` in executor context.
### Phase 21: Context Stability & Chunking Pipeline
### Phase 21: Scratchpad + Tree-Split Continuation
**Why this is a phase and not an optimization:** when playbooks accumulate into the thousands, naive concatenation blows any model's context window. LLM Team hit this exact issue running multi-model ranking — context got lost silently, rankings degraded, and there was no pipeline to catch it. We cannot trust cloud inference to save us: bigger context window just raises the cliff. The stable answer is a chunking + caching layer that runs BEFORE any agent call.
**Why this is a phase and not an optimization:** bumping `max_tokens` until a response stops truncating is a tourniquet — J called this out explicitly. As playbooks accumulate into the hundreds and responses grow, eventually SOME request will exceed SOME model's window, and we can't solve it by raising a number. The stable answer is two primitives that let us handle arbitrary-size work without losing context: a scratchpad that glues multi-call responses together, and a tree split that shards oversized inputs and reduces them back.
**Two primitives (WIRED 2026-04-21 in `tests/multi-agent/agent.ts`):**
1. **`generateContinuable()`** — handles OUTPUT overflow. Calls the model; checks structural completeness (for JSON: matched braces + JSON.parse success; for text: non-empty). If incomplete, calls again with "continue from here" + the partial response as scratchpad. Up to `max_continuations` times. No `max_tokens` tuning needed — if thinking ate the initial budget, continuation picks up the slack.
2. **`generateTreeSplit()`** — handles INPUT overflow. Caller passes an array of shards (semantic chunks of the corpus). For each shard: map call with running scratchpad digest. Final reduce call produces the answer. Scratchpad truncates oldest content if it approaches its own budget. If a single shard still overflows, `assertContextBudget` throws — caller must re-shard at finer granularity, NOT silently truncate.
**Guarantees:**
1. Every `generate()` call goes through a budget check against the model's declared `context_window` minus `safety_margin`.
2. On overflow, the declared `overflow_policy` fires — one of: summarize oldest tool results via T3, cosine-rank top-K lessons, two-pass map-reduce across shards, or escalate to a bigger-context model.
3. Chunk shards of large corpora (playbook_memory, lesson archive) are precomputed into `data/_chunk_cache/` keyed by corpus hash. Invalidation is automatic when the hash changes.
4. Token estimation uses `chars / 4` (biased safe by ~15%) until we wire the provider's tokenizer.
1. No agent call can silently truncate. Either it completes, continues, or throws with numbers.
2. No corpus is too big — `generateTreeSplit` handles any size the caller can shard.
3. Scratchpad is the glue between multi-call responses; context is never lost, only compacted.
4. Token estimation uses `chars / 4` (biased safe ~15%) until we wire the provider's tokenizer.
**What lives where:**
- Budget checker in `crates/aibridge/src/budget.rs` (new)
- Chunker in `crates/aibridge/src/chunk.rs` (new) — deterministic splits on sentence/paragraph boundaries, each shard ≤ policy cap
- Cache service in `crates/storaged/src/chunk_cache.rs` (new) — `corpus_hash → [shard_id, tokens, content]`
- Agent helper `tests/multi-agent/agent.ts::generateSafe()` wraps `generate()` + `generateCloud()` with the checker
- Metric emitted on every overflow: `context_overflow{tier=?,policy=?,model=?}` → surfaces on /metrics
**What lives where now:**
- `agent.ts::estimateTokens()` + `assertContextBudget()` + `generateContinuable()` + `generateTreeSplit()` — WIRED
- `scenario.ts` executor + reviewer + overviewGenerate calls — migrated to `generateContinuable`
- `config/models.json` — context_window + context_budget + overflow_policies per tier (declarative)
**Status:** `context_window` + `context_budget` + `overflow_policies` WIRED in config/models.json. Enforcement helpers NOT yet wired. Implementation target: current sprint — this is a stability floor, not a feature.
**Next sprint (Rust side, so gateway tools share it):**
- `crates/aibridge/src/continuation.rs` — port of `generateContinuable`
- `crates/aibridge/src/tree_split.rs` — port of `generateTreeSplit`
- `crates/storaged/src/chunk_cache.rs` — precomputed shards keyed by corpus hash (avoid re-chunking on every T4 run)
- `/metrics` counter: `context_continuations_total{model,shape,succeeded}`
### Phase 22+: Further horizon
**Status:** TS primitives WIRED. Rust port pending. The escalation path (tree split → bigger-context cloud model → kimi-k2:1t's 1M window → split decision into sub-decisions) is declared in `config/models.json` under `context_management.overflow_policies`.
### Phase 22: Internal Knowledge Library (KB)
Meta-layer over Phase 19 playbook_memory. Playbook memory answers "which WORKERS worked for this event." The KB answers "which CONFIG worked for this playbook signature." Subject changes from workers to the system itself — model choice, budget hints, overflow policies, pathway notes.
**Files (`data/_kb/`):**
- `signatures.jsonl` — (sig_hash, embedding[], first_seen, last_seen, run_count). Sig = stable hash of the sequence of (kind, role, count, city, state) across events.
- `outcomes.jsonl` — per-run record: {sig, run_id, models, ok/total, turns, citations, per-event summary, elapsed}.
- `pathway_recommendations.jsonl` — AI-synthesized for next run: {confidence, rationale, top_models, budget_hints, pathway_notes, neighbors_consulted}.
- `error_corrections.jsonl` — detected fail→succeed pairs on same sig, diff of what changed.
- `config_snapshots.jsonl` — history of models.json changes + why.
**Cycle (event-driven, not wall-clock):**
1. Scenario ends → `kb.indexRun()` extracts signature, embeds spec digest, appends outcome.
2. `kb.recommendFor(nextSpec)` finds k-NN signatures via cosine, feeds their outcome history + recent error corrections to the overview model, writes a structured recommendation.
3. Next scenario starts → `kb.loadRecommendation(spec)` pulls the newest rec for this sig, injects `pathway_notes` into `guidanceFor()` alongside prior lessons.
**Why file-based for MVP:** Phase 19 playbook_memory is already a catalogd dataset. KB is a separate meta-layer; keep it file-based first to iterate without a gateway schema migration. Rust port (and promotion to vectord-indexed corpus for neighbor search at scale) lands once shape stabilizes — mirrors how Phase 21 primitives were TS-first → Rust next sprint.
**What the overview model gets asked:**
- Target scenario digest
- Top-k neighbor signatures with avg ok rate, best model combo per neighbor
- Recent error corrections (sig, before/after model set)
**What it outputs (JSON-constrained):**
- confidence (high/medium/low)
- rationale (2-3 sentences)
- top_models {executor, reviewer, overview}
- budget_hints {executor_max_tokens, reviewer_max_tokens, executor_think}
- pathway_notes (concrete pre-run advice)
**Status (WIRED 2026-04-21):** `tests/multi-agent/kb.ts` holds all primitives. scenario.ts reads rec at start, indexes + recommends at end. Cold start gracefully writes a "low confidence, no history" rec so the second run has a floor to build on.
### Phase 23+: Further horizon
- Specialized fine-tuned models per domain (staffing matcher, resume parser)
- Video/audio transcript ingest + multimodal embeddings

77
scripts/ab_t3_test.sh Executable file
View File

@ -0,0 +1,77 @@
#!/usr/bin/env bash
# A/B test of T3 overseer: does it actually make subsequent runs better?
# Chains Run B (T3 seed) → Run C (T3 + read-back) → Run D (T3 cloud).
# Run A is assumed already complete (launched separately). Aggregates
# metrics at the end into ab_scorecard.json.
set -e
cd "$(dirname "$0")/.."
export OLLAMA_CLOUD_KEY="$(python3 -c "import json; print(json.load(open('/root/llm_team_config.json'))['providers']['ollama_cloud']['api_key'])")"
echo "▶ A/B test start at $(date -Iseconds)"
echo "▶ prior lessons dir: $(ls data/_playbook_lessons 2>/dev/null | wc -l) files"
# Run B — T3 enabled local, no prior lessons should exist yet
echo "──── RUN B: T3 local, seeds first lesson ────"
bun tests/multi-agent/scenario.ts > /tmp/lakehouse_ab_B.log 2>&1 || true
echo " B exit=$?"
ls data/_playbook_lessons/*.json 2>/dev/null | head -5
# Run C — T3 enabled local, B's lesson should load
echo "──── RUN C: T3 local, reads B's lesson ────"
bun tests/multi-agent/scenario.ts > /tmp/lakehouse_ab_C.log 2>&1 || true
echo " C exit=$?"
# Run D — T3 enabled CLOUD (gpt-oss:120b), reads B+C lessons
echo "──── RUN D: T3 cloud, reads B+C lessons ────"
LH_OVERVIEW_CLOUD=1 bun tests/multi-agent/scenario.ts > /tmp/lakehouse_ab_D.log 2>&1 || true
echo " D exit=$?"
echo "▶ all runs done at $(date -Iseconds)"
echo "▶ scorecard:"
ls -1dt tests/multi-agent/playbooks/scenario-* | head -4 | tac | python3 -c "
import sys, os, json
runs = [l.strip() for l in sys.stdin if l.strip()]
labels = ['A(no-T3)','B(T3-seed)','C(T3-read)','D(T3-cloud)']
# Prepend Run A: most recent BEFORE the ab_t3_test kicked off is Run A
# (launched separately). But we only picked up the most recent 4 runs.
# Actually: ab_t3_test runs B/C/D, so recent 3 = B,C,D. Run A is the one
# BEFORE those — find it separately.
# Reread to include Run A:
import subprocess
all_runs = subprocess.check_output(['bash','-c','ls -1dt tests/multi-agent/playbooks/scenario-* | head -8']).decode().strip().split('\n')
# The 4 most recent are D, C, B, A (reverse chronological).
top4 = list(reversed(all_runs[:4])) # oldest first → A,B,C,D
rows = []
for i, path in enumerate(top4):
try:
results = json.load(open(os.path.join(path, 'results.json')))
except FileNotFoundError:
continue
ok = sum(1 for r in results if r.get('ok'))
turns = sum(r.get('turns', 0) for r in results)
gaps = sum(len(r.get('gap_signals', [])) for r in results)
cites = sum(len(r.get('playbook_citations') or []) for r in results)
prior = []
try:
prior = json.load(open(os.path.join(path, 'prior_lessons.json')))
except FileNotFoundError:
pass
rows.append({
'label': labels[i] if i < len(labels) else f'run{i}',
'path': path,
'ok_events': ok,
'total_events': len(results),
'total_turns': turns,
'total_gaps': gaps,
'total_citations': cites,
'prior_lessons_loaded': len(prior),
})
scorecard = {'generated_at': __import__('datetime').datetime.utcnow().isoformat()+'Z', 'runs': rows}
open('tests/multi-agent/playbooks/ab_scorecard.json','w').write(json.dumps(scorecard, indent=2))
print(json.dumps(scorecard, indent=2))
"
echo "▶ saved: tests/multi-agent/playbooks/ab_scorecard.json"

163
scripts/kb_measure.py Executable file
View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Aggregate KB state for item 3 decision.
Reads data/_kb/*.jsonl and tests/multi-agent/playbooks/*/results.json
to answer:
- How many distinct signatures exist?
- Total runs, avg ok rate, avg citations per event?
- Which (role, city) combos have NEVER gotten a citation?
- Recommender confidence progression (cold medium high)?
- Mean turn count trend across runs (proxy for efficiency).
Run after `scripts/run_kb_batch.sh` completes. Writes a markdown
summary to tests/multi-agent/playbooks/kb_measurement.md and prints
to stdout.
"""
import json
import os
import sys
from collections import Counter, defaultdict
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
KB = ROOT / "data" / "_kb"
PLAYBOOKS = ROOT / "tests" / "multi-agent" / "playbooks"
def load_jsonl(p):
if not p.exists():
return []
out = []
for line in p.read_text().splitlines():
if line.strip():
try:
out.append(json.loads(line))
except json.JSONDecodeError:
pass
return out
def main():
sigs = load_jsonl(KB / "signatures.jsonl")
outcomes = load_jsonl(KB / "outcomes.jsonl")
recs = load_jsonl(KB / "pathway_recommendations.jsonl")
corrections = load_jsonl(KB / "error_corrections.jsonl")
# --- Basic counts ---
print(f"Signatures: {len(sigs)}")
print(f"Outcomes: {len(outcomes)}")
print(f"Recommendations: {len(recs)}")
print(f"Error corrections: {len(corrections)}")
print()
# --- Recommender confidence progression ---
conf_counts = Counter(r.get("confidence", "?") for r in recs)
print(f"Recommender confidence distribution:")
for c in ("high", "medium", "low"):
print(f" {c:8s}: {conf_counts.get(c, 0)}")
print()
# Time-ordered confidence
recs_sorted = sorted(recs, key=lambda r: r.get("generated_at", ""))
neighbor_counts = [len(r.get("neighbors_consulted", [])) for r in recs_sorted]
if neighbor_counts:
print(f"Neighbors consulted over time (first → last):")
print(f" first 3: {neighbor_counts[:3]}")
print(f" last 3: {neighbor_counts[-3:]}")
print(f" max: {max(neighbor_counts)}")
print()
# --- Fill rate + citation density per run ---
if outcomes:
total_ok = sum(o["ok_events"] for o in outcomes)
total_events = sum(o["total_events"] for o in outcomes)
total_cites = sum(o.get("total_citations", 0) for o in outcomes)
total_turns = sum(o.get("total_turns", 0) for o in outcomes)
print(f"Fill rate: {total_ok}/{total_events} = {100*total_ok/max(1,total_events):.1f}%")
print(f"Avg citations per run: {total_cites/len(outcomes):.2f}")
print(f"Avg turns per run: {total_turns/len(outcomes):.1f}")
print()
# First 5 runs vs last 5 — does it get better?
sorted_out = sorted(outcomes, key=lambda o: o.get("created_at", ""))
if len(sorted_out) >= 10:
first = sorted_out[:5]
last = sorted_out[-5:]
fok = sum(o["ok_events"] for o in first) / sum(o["total_events"] for o in first)
lok = sum(o["ok_events"] for o in last) / sum(o["total_events"] for o in last)
fcit = sum(o.get("total_citations", 0) for o in first) / 5
lcit = sum(o.get("total_citations", 0) for o in last) / 5
print(f"First 5 runs ok rate: {100*fok:.1f}% avg cites: {fcit:.2f}")
print(f"Last 5 runs ok rate: {100*lok:.1f}% avg cites: {lcit:.2f}")
print()
# --- Per-(role, city) citation coverage ---
cite_by_combo = Counter()
combo_attempts = Counter()
for o in outcomes:
for ev in o.get("per_event", []):
key = (ev.get("role", "?"), "?") # city not in per_event summary
combo_attempts[key] += 1
# Read the playbook dirs for full event detail (has city)
cites_by_role_city = defaultdict(lambda: {"attempts": 0, "citations": 0, "ok": 0})
for o in outcomes:
run_dir = PLAYBOOKS / o["run_id"]
results_file = run_dir / "results.json"
if not results_file.exists():
continue
try:
results = json.loads(results_file.read_text())
except Exception:
continue
for r in results:
e = r.get("event", {})
key = (e.get("role"), e.get("city"), e.get("state"))
cites_by_role_city[key]["attempts"] += 1
cites_by_role_city[key]["citations"] += len(r.get("playbook_citations") or [])
if r.get("ok"):
cites_by_role_city[key]["ok"] += 1
combos_with_cites = [(k, v) for k, v in cites_by_role_city.items() if v["citations"] > 0]
combos_zero_cites = [(k, v) for k, v in cites_by_role_city.items() if v["citations"] == 0 and v["ok"] > 0]
print(f"(role, city, state) combos with any citation: {len(combos_with_cites)}")
print(f"(role, city, state) combos with ok fills but 0 cites: {len(combos_zero_cites)}")
print()
if combos_with_cites:
print("Top 10 combos by citation count:")
for (role, city, state), v in sorted(combos_with_cites, key=lambda x: -x[1]["citations"])[:10]:
print(f" {role:25s} {city:15s} {state}: {v['citations']} cites across {v['attempts']} attempts ({v['ok']} ok)")
print()
# --- Write markdown report ---
lines = ["# KB Measurement Report", ""]
lines.append(f"Generated from {len(outcomes)} runs across {len(sigs)} distinct signatures.")
lines.append("")
lines.append("## Recommender confidence")
for c in ("high", "medium", "low"):
lines.append(f"- {c}: {conf_counts.get(c, 0)}")
lines.append("")
lines.append("## Overall fill + citation")
if outcomes:
lines.append(f"- Fill rate: **{total_ok}/{total_events}** ({100*total_ok/max(1,total_events):.1f}%)")
lines.append(f"- Avg citations per run: **{total_cites/len(outcomes):.2f}**")
lines.append(f"- Avg turns per run: {total_turns/len(outcomes):.1f}")
lines.append("")
lines.append("## Citation coverage by (role, city, state)")
lines.append(f"- Combos with ≥1 citation: {len(combos_with_cites)}")
lines.append(f"- Combos with ok fills but 0 citations: {len(combos_zero_cites)}")
lines.append("")
lines.append("## Item 3 decision signal")
if combos_zero_cites:
lines.append("Non-zero: there are **combos that succeeded but never triggered playbook_memory boost**. Candidates for item 3 investigation:")
for (role, city, state), v in combos_zero_cites[:5]:
lines.append(f"- {role} in {city}, {state}: {v['ok']}/{v['attempts']} ok, 0 cites")
else:
lines.append("All ok combos got at least some citation firing. Boost mechanism is healthy; raising the cap may help but isn't forced.")
lines.append("")
out = PLAYBOOKS / "kb_measurement.md"
out.write_text("\n".join(lines))
print(f"✓ markdown report → {out}")
if __name__ == "__main__":
main()

43
scripts/run_kb_batch.sh Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
# Run all generated scenarios sequentially to populate the KB.
# Reads tests/multi-agent/scenarios/manifest.json and feeds each file
# to scenario.ts. Each scenario indexes into data/_kb/ automatically
# via the end-of-run hook. Exit code: 0 if all scenarios completed
# (event failures are NOT failures for the batch — we want the KB to
# record both successes AND failures).
set -e
cd "$(dirname "$0")/.."
export OLLAMA_CLOUD_KEY="$(python3 -c "import json; print(json.load(open('/root/llm_team_config.json'))['providers']['ollama_cloud']['api_key'])" 2>/dev/null || echo '')"
MANIFEST="tests/multi-agent/scenarios/manifest.json"
if [ ! -f "$MANIFEST" ]; then
echo "✗ no manifest at $MANIFEST — run: bun tests/multi-agent/gen_scenarios.ts <N>"
exit 1
fi
START_TS=$(date -Iseconds)
LOG_DIR="/tmp/lakehouse_kb_batch_$(date +%s)"
mkdir -p "$LOG_DIR"
echo "▶ KB batch start: $START_TS, logs → $LOG_DIR"
python3 -c "
import json
m = json.load(open('$MANIFEST'))
for s in m['scenarios']:
print(s['file'])
" | while read -r SCEN; do
SPEC="tests/multi-agent/scenarios/$SCEN"
BASE=$(basename "$SPEC" .json)
LOG="$LOG_DIR/${BASE}.log"
echo "$SCEN"
bun tests/multi-agent/scenario.ts "$SPEC" > "$LOG" 2>&1 || true
OK=$(grep -oP '\d+/\d+ events succeeded' "$LOG" | tail -1 || echo "no-result")
SIG=$(grep -oP 'KB indexed: sig=\K[a-f0-9]+' "$LOG" | tail -1 || echo "-")
echo "$OK; sig=$SIG"
done
echo "▶ KB batch done: $(date -Iseconds)"
echo "▶ KB state:"
wc -l data/_kb/*.jsonl 2>/dev/null || true

View File

@ -16,6 +16,10 @@ class GenerateRequest(BaseModel):
system: str | None = None
temperature: float = 0.7
max_tokens: int = 2048
# think=false disables hidden reasoning blocks on thinking models
# (qwen3, qwen3.5, gpt-oss). Required for hot-path JSON emitters
# that need the whole token budget for the visible response.
think: bool | None = None
class GenerateResponse(BaseModel):
@ -40,6 +44,8 @@ async def generate(req: GenerateRequest):
}
if req.system:
payload["system"] = req.system
if req.think is not None:
payload["think"] = req.think
async with client() as c:
resp = await c.post("/api/generate", json=payload)

View File

@ -22,6 +22,214 @@ export function estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
// ============================================================
// Scratchpad + tree-split continuation
// ============================================================
//
// Core problem: when a prompt OR response would exceed a model's window
// — e.g. 200 playbooks pasted in, or a long propose_done payload — we
// cannot just raise max_tokens forever. That's the tourniquet approach
// J explicitly rejected. The right answer is: split into sub-calls
// glued by a persistent scratchpad, so no context is ever lost.
//
// Two primitives:
//
// 1. generateContinuable(): for OUTPUT overflow. If the model returns
// a structurally-incomplete JSON (unmatched braces, truncated
// mid-value), auto-continue with the partial as scratchpad until
// the JSON parses or max_continuations is hit.
//
// 2. generateTreeSplit(): for INPUT overflow. If prompt + system +
// max_tokens exceeds window, shard the input at semantic boundaries,
// run each shard through the model with a running scratchpad digest,
// then run a final reduce pass to combine. This is map-reduce with
// glue.
//
// Both are tier-agnostic — they wrap generate() and generateCloud().
// Check structural completeness of a response. For non-JSON responses
// (lesson text, checkpoint hints) we treat "non-empty" as complete.
// For JSON-shaped responses (executor/reviewer actions) we check that
// braces match AND that JSON.parse succeeds on the first {...} block.
function isStructurallyComplete(text: string, shape: "json" | "text"): boolean {
if (!text || !text.trim()) return false;
if (shape === "text") return true;
// Strip ``` fences
let s = text.trim();
if (s.startsWith("```")) s = s.replace(/^```(?:json)?\n?/, "").replace(/```$/, "").trim();
const start = s.indexOf("{");
const end = s.lastIndexOf("}");
if (start < 0 || end <= start) return false;
const slice = s.slice(start, end + 1);
// Balance check — cheaper than JSON.parse and catches truncated nests
let depth = 0, inStr = false, esc = false;
for (const c of slice) {
if (esc) { esc = false; continue; }
if (c === "\\") { esc = true; continue; }
if (c === '"') { inStr = !inStr; continue; }
if (inStr) continue;
if (c === "{") depth++;
else if (c === "}") depth--;
}
if (depth !== 0) return false;
try { JSON.parse(slice); return true; } catch { return false; }
}
// Continue a truncated response. We do NOT ask the model to start over —
// we ask it to continue from exactly where it stopped. The partial goes
// in as scratchpad so it knows what's already committed.
async function continueResponse(
model: string,
originalPrompt: string,
partial: string,
opts: { max_tokens: number; temperature: number; system?: string; cloud: boolean; think?: boolean },
): Promise<string> {
const continuationPrompt = `${originalPrompt}
PARTIAL RESPONSE SO FAR (continue from here do NOT restart, do NOT repeat what's already there, emit ONLY the remaining tokens to close the structure):
${partial}`;
const fn = opts.cloud ? generateCloud : generate;
const rest = await fn(model, continuationPrompt, {
max_tokens: opts.max_tokens,
temperature: opts.temperature,
system: opts.system,
bypass_budget: true, // the caller already checked; continuation doesn't double-count
think: opts.think,
});
return partial + rest;
}
// Output-overflow handler. Handles two distinct failure modes:
// (a) EMPTY response — thinking model ate the whole budget. Retry the
// ORIGINAL prompt with 2x the budget (geometric backoff up to cap).
// (b) TRUNCATED non-empty — model got most of the way there but hit
// max_tokens before closing. Continue with the partial as
// scratchpad.
// The two modes need different repair: (a) needs more budget, not
// continuation from ""; (b) needs scratchpad-glued continuation.
export async function generateContinuable(
model: string,
prompt: string,
opts: {
max_tokens?: number;
temperature?: number;
system?: string;
shape?: "json" | "text";
max_continuations?: number;
cloud?: boolean;
think?: boolean;
on_continuation?: (n: number, combined_len: number) => void;
} = {},
): Promise<string> {
const shape = opts.shape ?? "json";
const initialMax = opts.max_tokens ?? 800;
const maxConts = opts.max_continuations ?? 3;
const cloud = opts.cloud ?? false;
const budgetCap = 8000; // don't geometric-backoff forever
const fn = cloud ? generateCloud : generate;
let combined = "";
let currentMax = initialMax;
// Initial call + empty-response backoff loop.
for (let retry = 0; retry < 3; retry++) {
const out = await fn(model, prompt, {
max_tokens: currentMax,
temperature: opts.temperature,
system: opts.system,
think: opts.think,
});
if (out.trim().length > 0) { combined = out; break; }
// Empty — thinking model ate the budget. Double it and retry.
if (opts.on_continuation) opts.on_continuation(retry + 1, 0);
currentMax = Math.min(currentMax * 2, budgetCap);
}
// Structural completion loop (continuation from partial).
for (let i = 0; i < maxConts; i++) {
if (isStructurallyComplete(combined, shape)) return combined;
if (opts.on_continuation) opts.on_continuation(i + 1, combined.length);
combined = await continueResponse(model, prompt, combined, {
max_tokens: Math.min(currentMax, budgetCap),
temperature: opts.temperature ?? 0.3,
system: opts.system,
cloud,
think: opts.think,
});
}
// Last-resort: return combined even if incomplete; caller's parser
// will throw with the raw text for forensics rather than silently
// truncating.
return combined;
}
// Input-overflow handler. When the input corpus exceeds the window,
// shard → map → reduce with a running scratchpad digest.
//
// shards: array of input chunks the caller already split at semantic
// boundaries (paragraphs, records, playbook entries).
// map_prompt: fn taking (shard, running_scratchpad) → prompt for a
// single map call. Must fit within window.
// reduce_prompt: fn taking (combined_scratchpad) → final prompt. Also
// must fit window; if the scratchpad itself overflows, this triggers
// a recursive tree-split.
//
// Result: the reduce call's response.
export async function generateTreeSplit(
model: string,
shards: string[],
map_prompt: (shard: string, scratchpad: string) => string,
reduce_prompt: (scratchpad: string) => string,
opts: {
max_tokens?: number;
temperature?: number;
system?: string;
cloud?: boolean;
on_shard?: (i: number, total: number) => void;
scratchpad_budget?: number;
} = {},
): Promise<{ response: string; scratchpad: string; shards_processed: number }> {
const cloud = opts.cloud ?? false;
const scratchpadBudget = opts.scratchpad_budget ?? 6000;
let scratchpad = "";
for (let i = 0; i < shards.length; i++) {
if (opts.on_shard) opts.on_shard(i + 1, shards.length);
const shardPrompt = map_prompt(shards[i], scratchpad);
// If the per-shard prompt alone exceeds window, the caller sharded
// too coarsely — bubble up rather than silently truncating.
assertContextBudget(model, shardPrompt, {
system: opts.system,
max_tokens: opts.max_tokens,
bypass: false,
});
const shardOut = await generateContinuable(model, shardPrompt, {
max_tokens: opts.max_tokens ?? 800,
temperature: opts.temperature,
system: opts.system,
shape: "text",
cloud,
});
// Append to scratchpad; truncate oldest if over budget.
scratchpad = (scratchpad + `\n— shard ${i + 1}/${shards.length} digest —\n` + shardOut.trim()).slice(-scratchpadBudget * 4);
}
const reducePrompt = reduce_prompt(scratchpad);
assertContextBudget(model, reducePrompt, {
system: opts.system,
max_tokens: opts.max_tokens,
bypass: false,
});
const response = await generateContinuable(model, reducePrompt, {
max_tokens: opts.max_tokens ?? 1500,
temperature: opts.temperature,
system: opts.system,
shape: "text",
cloud,
});
return { response, scratchpad, shards_processed: shards.length };
}
// Known context windows — matches crates/../config/models.json. Kept in
// code as a fallback so the test harness doesn't crash if the config is
// missing. Production path should read from models.json.
@ -29,10 +237,14 @@ export const CONTEXT_WINDOWS: Record<string, number> = {
"mistral:latest": 32768,
"qwen2.5:latest": 32768,
"qwen3:latest": 40960,
"qwen3.5:latest": 262144, // local 9.7B — new executor
"qwen3-embedding": 32768, // local embedding model
"nomic-embed-text-v2-moe": 2048, // local embedding, MoE
"gpt-oss:20b": 131072,
"gpt-oss:120b": 131072,
"qwen3.5:397b": 131072,
"kimi-k2-thinking": 200000,
"kimi-k2.6": 200000, // cloud — new T4 candidate
"kimi-k2:1t": 1048576,
"deepseek-v3.1:671b": 131072,
"glm-4.7": 131072,
@ -89,6 +301,7 @@ export interface LogEntry {
| "critique"
| "propose_done"
| "consensus_done"
| "note"
| "error";
content: any;
}
@ -105,7 +318,11 @@ export type Action =
export interface Fill {
candidate_id: string;
name: string;
reason: string;
reason?: string; // optional — the schema used to require it; nothing
// downstream consumed it, and qwen3.5 would emit
// 100-150 char justifications per fill that blew
// the JSON budget on 4+ fills. Kept optional so
// models that still emit it don't break parsing.
}
// --- HTTP helpers (fail-fast) ---
@ -135,7 +352,10 @@ export async function callTool(tool: string, args: Record<string, any>): Promise
});
}
export async function hybridSearch(sql_filter: string, question: string, k = 10): Promise<any> {
// Default k=20 is a floor, not a ceiling — executor prompt instructs
// models to scale k to 5× target_count (cap 80) so multi-fill events
// get a meaningfully deep pool to rank within.
export async function hybridSearch(sql_filter: string, question: string, k = 20): Promise<any> {
return http("POST", `${GATEWAY}/vectors/hybrid`, { sql_filter, question, k });
}
@ -150,6 +370,7 @@ export async function generate(model: string, prompt: string, opts: {
temperature?: number;
system?: string;
bypass_budget?: boolean;
think?: boolean;
} = {}): Promise<string> {
assertContextBudget(model, prompt, {
system: opts.system,
@ -163,11 +384,13 @@ export async function generate(model: string, prompt: string, opts: {
max_tokens: opts.max_tokens ?? 800,
};
if (opts.system) body.system = opts.system;
if (opts.think !== undefined) body.think = opts.think;
const r = await http<any>("POST", `${SIDECAR}/generate`, body);
const text = r.text ?? "";
if (!text || typeof text !== "string") {
throw new Error(`generate returned empty text from ${model}: ${JSON.stringify(r).slice(0, 200)}`);
}
const text = typeof r.text === "string" ? r.text : "";
// Do NOT throw on empty. Thinking models (gpt-oss, qwen3.5) burn the
// max_tokens budget on hidden reasoning and emit "" when budget was
// too tight. generateContinuable detects empty + continues with more
// budget. Callers that expected non-empty can check themselves.
return text;
}
@ -181,6 +404,7 @@ export async function generateCloud(model: string, prompt: string, opts: {
temperature?: number;
system?: string;
bypass_budget?: boolean;
think?: boolean;
} = {}): Promise<string> {
if (!OLLAMA_CLOUD_KEY) {
throw new Error("OLLAMA_CLOUD_KEY not set; cannot reach Ollama Cloud");
@ -200,6 +424,7 @@ export async function generateCloud(model: string, prompt: string, opts: {
},
};
if (opts.system) body.system = opts.system;
if (opts.think !== undefined) body.think = opts.think;
const resp = await fetch(`${OLLAMA_CLOUD_URL}/api/generate`, {
method: "POST",
headers: {
@ -212,10 +437,9 @@ export async function generateCloud(model: string, prompt: string, opts: {
throw new Error(`Ollama Cloud ${resp.status}: ${await resp.text().catch(() => "?")}`);
}
const data: any = await resp.json();
const text = data.response ?? "";
if (!text) {
throw new Error(`Ollama Cloud returned empty response for ${model}: ${JSON.stringify(data).slice(0, 200)}`);
}
const text = typeof data.response === "string" ? data.response : "";
// Same non-throw policy as local generate() — empty text is a valid
// signal that thinking ate the budget. Let generateContinuable retry.
return text;
}
@ -231,7 +455,11 @@ Available tools (each takes a JSON "args" object):
{"index_name":"workers_500k_v1",
"sql_filter":"role = 'Forklift Operator' AND city = 'Toledo' AND state = 'OH' AND CAST(availability AS DOUBLE) > 0.5",
"question":"reliable forklift operator Toledo",
"k":10}
"k":40}
k should scale with target_count: roughly 5× the number of fills
needed, floor 20, cap 80. For 1-2 fills use k=20. For 5 fills use
k=40. A deep pool lets the ranker discriminate across a larger
candidate set; k=10 was too tight for multi-fill events.
- sql(query: string)
Raw read-only SELECT. Use for verification (confirm a worker exists,
@ -363,8 +591,8 @@ Your next action MUST be a JSON object matching one of these shapes:
use on turn 1 to outline your approach. Steps must be concrete.
{"kind":"tool_call","tool":"...","args":{...},"rationale":"why"}
call a tool and see its result next turn.
{"kind":"propose_done","fills":[{"candidate_id":"...","name":"First Last","reason":"why them"}],"rationale":"..."}
propose you've met the target. fills MUST have EXACTLY ${task.target_count} entries count twice before emitting.
{"kind":"propose_done","fills":[{"candidate_id":"...","name":"First Last"}],"rationale":"..."}
propose you've met the target. fills MUST have EXACTLY ${task.target_count} entries count twice before emitting. Each fill is ONLY {candidate_id, name} no reason field, no scores, no commentary.
Strategy tip: once "CANDIDATES SURFACED SO FAR" has ${task.target_count} entries in ${task.target_city}, ${task.target_state} matching ${task.target_role}, verify ONE via the sql tool (to satisfy the reviewer's SQL-verification criterion) and then propose_done with the top ${task.target_count}. Don't keep re-searching.

View File

@ -0,0 +1,187 @@
// Scenario generator for Phase 22 KB corpus-building.
//
// Emits N unique ScenarioSpec JSON files under
// tests/multi-agent/scenarios/ covering:
// - different clients (so sig varies even when events match)
// - different city/state combos actually present in workers_500k
// - varied event sequences (baseline/recurring/expansion/emergency/misplacement)
// - varied role mixes from the industrial staffing taxonomy
//
// Each scenario spec is written as scen_NN_CLIENT_CITY.json and can be
// fed to scenario.ts as argv[2]. A sibling run_batch.sh runs them all
// sequentially so the KB populates overnight.
//
// Determinism: the RNG seed is argv[2] (defaulting to 42) so repeat
// invocations produce identical specs.
import { mkdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
// Deterministic PRNG — mulberry32, same as many test harnesses. Stable
// across bun versions; not cryptographic.
function mulberry32(seed: number) {
let s = seed >>> 0;
return () => {
s = (s + 0x6D2B79F5) >>> 0;
let t = s;
t = Math.imul(t ^ (t >>> 15), t | 1);
t ^= t + Math.imul(t ^ (t >>> 7), t | 61);
return ((t ^ (t >>> 14)) >>> 0) / 4294967296;
};
}
// Cities known to exist in workers_500k, chosen to avoid false-empty
// searches. All Midwest because that's the target persona's geography.
const CITIES: Array<{ city: string; state: string }> = [
{ city: "Toledo", state: "OH" },
{ city: "Cleveland", state: "OH" },
{ city: "Columbus", state: "OH" },
{ city: "Cincinnati", state: "OH" },
{ city: "Akron", state: "OH" },
{ city: "Detroit", state: "MI" },
{ city: "Grand Rapids", state: "MI" },
{ city: "Flint", state: "MI" },
{ city: "Indianapolis", state: "IN" },
{ city: "Fort Wayne", state: "IN" },
{ city: "Gary", state: "IN" },
{ city: "Chicago", state: "IL" },
{ city: "Joliet", state: "IL" },
{ city: "Rockford", state: "IL" },
{ city: "Milwaukee", state: "WI" },
{ city: "Madison", state: "WI" },
{ city: "Louisville", state: "KY" },
{ city: "Lexington", state: "KY" },
{ city: "Kansas City", state: "MO" },
{ city: "St. Louis", state: "MO" },
];
// Industrial staffing role taxonomy. Weighted so common roles appear
// more often (realistic distribution).
const ROLES: Array<{ role: string; weight: number }> = [
{ role: "Warehouse Associate", weight: 5 },
{ role: "Machine Operator", weight: 4 },
{ role: "Forklift Operator", weight: 4 },
{ role: "Loader", weight: 3 },
{ role: "Material Handler", weight: 3 },
{ role: "Assembler", weight: 3 },
{ role: "Quality Tech", weight: 2 },
{ role: "Picker", weight: 3 },
{ role: "Packer", weight: 3 },
{ role: "Shipping Clerk", weight: 2 },
{ role: "Receiving Clerk", weight: 2 },
{ role: "Welder", weight: 2 },
{ role: "CNC Operator", weight: 2 },
{ role: "Maintenance Tech", weight: 1 },
];
const CLIENTS = [
"Riverfront Steel", "Northland Logistics", "Great Lakes Mfg",
"Midway Distribution", "Pioneer Assembly", "Cornerstone Fabrication",
"Horizon Supply", "Keystone Plastics", "Apex Warehouse",
"Heritage Foods", "Summit Industrial", "Vanguard Components",
"Centennial Packaging", "Parallel Machining", "Beacon Freight",
];
function pickWeighted<T extends { weight: number }>(rng: () => number, items: T[]): T {
const total = items.reduce((s, x) => s + x.weight, 0);
let r = rng() * total;
for (const x of items) { r -= x.weight; if (r <= 0) return x; }
return items[items.length - 1];
}
function pick<T>(rng: () => number, items: T[]): T {
return items[Math.floor(rng() * items.length)];
}
// Event shape templates. Each scenario picks 3-6 of these at random.
// Multi-fill counts skew low to make the harness quicker; 5+ fill
// events are the hardest and should be rarer in a corpus run.
type EventKind = "baseline_fill" | "recurring" | "expansion" | "emergency" | "misplacement";
function makeEvent(
rng: () => number,
kind: EventKind,
at: string,
city: string,
state: string,
): any {
const { role } = pickWeighted(rng, ROLES);
const count = kind === "misplacement" ? 1
: kind === "expansion" ? 2 + Math.floor(rng() * 4) // 2-5
: kind === "baseline_fill" ? 1 + Math.floor(rng() * 3) // 1-3
: kind === "recurring" ? 1 + Math.floor(rng() * 2) // 1-2
: /* emergency */ 2 + Math.floor(rng() * 3); // 2-4
const hour = 8 + Math.floor(rng() * 10);
const min = Math.random() > 0.5 ? 0 : 30;
const at_real = `${String(hour).padStart(2, "0")}:${String(min).padStart(2, "0")}`;
return {
kind,
at: at_real,
role,
count,
city,
state,
shift_start: `${at_real.replace(":", ":")} AM`,
};
}
function genSpec(rng: () => number, id: number): any {
const client = pick(rng, CLIENTS);
const { city, state } = pick(rng, CITIES);
const today = new Date();
const date = new Date(today.getTime() + id * 86400000)
.toISOString().split("T")[0];
// Scenario shape mix — 60% pure fill (baseline+recurring+expansion),
// 40% mixed (add emergency and/or misplacement).
const includeEmergency = rng() > 0.6;
const includeMisplacement = rng() > 0.6;
const events: any[] = [];
// always at least one baseline
events.push(makeEvent(rng, "baseline_fill", "08:00", city, state));
if (rng() > 0.3) events.push(makeEvent(rng, "recurring", "10:30", city, state));
if (rng() > 0.5) events.push(makeEvent(rng, "expansion", "12:15", city, state));
if (includeEmergency) events.push(makeEvent(rng, "emergency", "14:00", city, state));
if (includeMisplacement) {
const e = makeEvent(rng, "misplacement", "15:45", city, state);
if (events.length > 0) e.replaces_event = events[0].at;
events.push(e);
}
return { client, date, events };
}
async function main() {
const n = Number(process.argv[2] ?? 20);
const seed = Number(process.argv[3] ?? 42);
const rng = mulberry32(seed);
const outDir = "tests/multi-agent/scenarios";
await mkdir(outDir, { recursive: true });
const manifest: Array<{ file: string; client: string; city: string; events: number }> = [];
for (let i = 0; i < n; i++) {
const spec = genSpec(rng, i);
const cityLabel = spec.events[0].city.replace(/\s+/g, "_");
const fname = `scen_${String(i).padStart(3, "0")}_${spec.client.replace(/\s+/g, "_")}_${cityLabel}.json`;
await writeFile(join(outDir, fname), JSON.stringify(spec, null, 2));
manifest.push({
file: fname,
client: spec.client,
city: spec.events[0].city,
events: spec.events.length,
});
}
await writeFile(
join(outDir, "manifest.json"),
JSON.stringify({ count: n, seed, scenarios: manifest }, null, 2),
);
console.log(`✓ generated ${n} scenarios → ${outDir}/ (seed=${seed})`);
for (const m of manifest.slice(0, 5)) {
console.log(` ${m.file}${m.client} (${m.city}), ${m.events} events`);
}
if (manifest.length > 5) console.log(` ... +${manifest.length - 5} more`);
}
main().catch(e => {
console.error("gen_scenarios failed:", (e as Error).message);
process.exit(1);
});

448
tests/multi-agent/kb.ts Normal file
View File

@ -0,0 +1,448 @@
// Phase 22 — Internal Knowledge Library.
//
// Sits on top of the successful-playbook store. Tracks which
// configurations produced which outcomes for which playbook
// signatures, detects fail→succeed error corrections across runs,
// and emits pathway recommendations that the NEXT scenario reads
// at startup.
//
// Event-driven cycle (not wall-clock polling):
// scenario ends → kb.indexRun() → kb.recommendFor(nextSpec) → next
// scenario reads top rec and applies.
//
// File layout under data/_kb/:
// signatures.jsonl — (sig_hash, embedding[], first_seen, last_seen, run_count)
// outcomes.jsonl — append-only per-run: {sig, run_id, models, pathway, ok_events, elapsed_s, errors[]}
// config_snapshots.jsonl — config/models.json hash + env at each ingest
// error_corrections.jsonl — fail→succeed deltas
// pathway_recommendations.jsonl — AI-synthesized suggestions
//
// Why file-based: Phase 19 playbook_memory is already in the
// catalogd+vectord stack. This is a separate meta-layer — keeping
// it file-based first lets us iterate quickly without a gateway
// schema migration. Rust port lands once the shape stabilizes
// (mirrors how Phase 21 primitives are TS-first → Rust next sprint).
import { mkdir, readFile, writeFile, appendFile, readdir } from "node:fs/promises";
import { join } from "node:path";
import { createHash } from "node:crypto";
import { SIDECAR, generateContinuable } from "./agent.ts";
const KB_DIR = "data/_kb";
const SIGNATURES_FILE = "signatures.jsonl";
const OUTCOMES_FILE = "outcomes.jsonl";
const CONFIG_SNAPSHOTS_FILE = "config_snapshots.jsonl";
const ERROR_CORRECTIONS_FILE = "error_corrections.jsonl";
const RECOMMENDATIONS_FILE = "pathway_recommendations.jsonl";
// What a playbook signature looks like — stable hash of the scenario
// shape that ignores timestamps and specific worker IDs. Two runs with
// the same sequence of (kind, role, count, city, state) get the same
// hash. This is the retrieval key for the KB.
export interface PlaybookSignature {
sig_hash: string;
client: string;
events_digest: string; // human-readable event summary
embedding: number[]; // sidecar embed, for nearest-neighbor
first_seen: string;
last_seen: string;
run_count: number;
}
// What we record after a run completes.
export interface RunOutcome {
sig_hash: string;
run_id: string; // scenario dir basename
date: string;
models: {
executor: string;
reviewer: string;
overview: string;
overview_cloud: boolean;
};
ok_events: number;
total_events: number;
total_turns: number;
total_gap_signals: number;
total_citations: number;
per_event: Array<{
at: string;
kind: string;
role: string;
count: number;
ok: boolean;
turns: number;
error: string | null;
}>;
elapsed_secs: number;
created_at: string;
}
// The AI-synthesized recommendation written back for future runs.
export interface PathwayRecommendation {
sig_hash: string;
generated_at: string;
generated_by_model: string;
confidence: "high" | "medium" | "low";
rationale: string; // prose from the recommender
top_models: { // suggested tier assignments
executor?: string;
reviewer?: string;
overview?: string;
};
budget_hints: { // suggested max_tokens / think flags
executor_max_tokens?: number;
reviewer_max_tokens?: number;
executor_think?: boolean;
};
pathway_notes: string; // "pre-fetch cert data", "use buffer of 3+"
neighbors_consulted: string[]; // sig_hashes of the playbooks that shaped this rec
}
// Compute a stable hash from the scenario spec. Two runs with the same
// events list (ignoring SMS drafts / timestamps / exclude lists) get
// the same hash.
export function computeSignature(spec: {
client: string;
events: Array<{ kind: string; role: string; count: number; city: string; state: string }>;
}): string {
const canonical = JSON.stringify({
client: spec.client,
events: spec.events.map(e => ({
kind: e.kind, role: e.role, count: e.count, city: e.city, state: e.state,
})),
});
return createHash("sha256").update(canonical).digest("hex").slice(0, 16);
}
// Human-readable digest of the spec, used in recommender prompts.
export function specDigest(spec: {
client: string;
events: Array<{ kind: string; role: string; count: number; city: string; state: string }>;
}): string {
return `${spec.client}: ` + spec.events.map(e =>
`${e.kind}/${e.role}×${e.count} in ${e.city},${e.state}`
).join(" | ");
}
async function ensureKb(): Promise<void> {
await mkdir(KB_DIR, { recursive: true });
}
async function readJsonl<T>(file: string): Promise<T[]> {
try {
const raw = await readFile(join(KB_DIR, file), "utf8");
return raw.split("\n").filter(l => l.trim()).map(l => JSON.parse(l) as T);
} catch {
return [];
}
}
// Index a completed scenario run. Extracts signature, computes
// embedding, appends outcome, updates or inserts signature entry.
export async function indexRun(
scenarioDir: string,
spec: { client: string; date: string; events: Array<any> },
models: { executor: string; reviewer: string; overview: string; overview_cloud: boolean },
elapsed_secs: number,
): Promise<{ sig_hash: string; outcome: RunOutcome }> {
await ensureKb();
const sig_hash = computeSignature(spec);
const digest = specDigest(spec);
// Read results.json produced by scenario.ts to build the outcome record.
const resultsRaw = await readFile(join(scenarioDir, "results.json"), "utf8");
const results = JSON.parse(resultsRaw) as any[];
const outcome: RunOutcome = {
sig_hash,
run_id: scenarioDir.split("/").pop() ?? scenarioDir,
date: spec.date,
models,
ok_events: results.filter(r => r.ok).length,
total_events: results.length,
total_turns: results.reduce((s, r) => s + (r.turns ?? 0), 0),
total_gap_signals: results.reduce((s, r) => s + (r.gap_signals?.length ?? 0), 0),
total_citations: results.reduce((s, r) => s + (r.playbook_citations?.length ?? 0), 0),
per_event: results.map(r => ({
at: r.event.at,
kind: r.event.kind,
role: r.event.role,
count: r.event.count,
ok: r.ok,
turns: r.turns ?? 0,
error: r.error ?? null,
})),
elapsed_secs,
created_at: new Date().toISOString(),
};
await appendFile(join(KB_DIR, OUTCOMES_FILE), JSON.stringify(outcome) + "\n");
// Embed + upsert signature. Skip if embedding fails — the outcome is
// still persisted, just without neighbor-search hookup.
try {
const resp = await fetch(`${SIDECAR}/embed`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ texts: [digest] }),
});
if (resp.ok) {
const data: any = await resp.json();
const embedding: number[] = data.embeddings?.[0] ?? [];
const existing = await readJsonl<PlaybookSignature>(SIGNATURES_FILE);
const now = new Date().toISOString();
const found = existing.find(s => s.sig_hash === sig_hash);
if (found) {
found.last_seen = now;
found.run_count += 1;
} else {
existing.push({
sig_hash,
client: spec.client,
events_digest: digest,
embedding,
first_seen: now,
last_seen: now,
run_count: 1,
});
}
// Rewrite the whole file — cheap while KB is small; swap to
// append-with-periodic-compact when row count exceeds ~10k.
await writeFile(
join(KB_DIR, SIGNATURES_FILE),
existing.map(s => JSON.stringify(s)).join("\n") + "\n",
);
}
} catch {
// Signature indexing failed — outcome is still captured. Next
// indexRun retries.
}
return { sig_hash, outcome };
}
// Cosine similarity for neighbor lookup. Float32-in-memory is fine for
// O(thousands) signatures; swap to vectord HNSW once the corpus grows.
function cosine(a: number[], b: number[]): number {
if (a.length === 0 || a.length !== b.length) return 0;
let dot = 0, na = 0, nb = 0;
for (let i = 0; i < a.length; i++) {
dot += a[i] * b[i];
na += a[i] * a[i];
nb += b[i] * b[i];
}
return dot / (Math.sqrt(na) * Math.sqrt(nb) + 1e-12);
}
// Find the k nearest-neighbor signatures for a target spec, returning
// neighbor sigs + their outcome history. This is what the recommender
// feeds to the overview model.
export async function findNeighbors(spec: any, k = 5): Promise<Array<{
sig: PlaybookSignature;
similarity: number;
outcomes: RunOutcome[];
}>> {
await ensureKb();
const digest = specDigest(spec);
const targetHash = computeSignature(spec);
const resp = await fetch(`${SIDECAR}/embed`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ texts: [digest] }),
});
if (!resp.ok) return [];
const data: any = await resp.json();
const targetVec: number[] = data.embeddings?.[0] ?? [];
const sigs = await readJsonl<PlaybookSignature>(SIGNATURES_FILE);
const outcomes = await readJsonl<RunOutcome>(OUTCOMES_FILE);
const ranked = sigs
.filter(s => s.sig_hash !== targetHash) // don't recommend against self
.map(s => ({ sig: s, similarity: cosine(targetVec, s.embedding) }))
.sort((a, b) => b.similarity - a.similarity)
.slice(0, k);
return ranked.map(r => ({
sig: r.sig,
similarity: r.similarity,
outcomes: outcomes.filter(o => o.sig_hash === r.sig.sig_hash),
}));
}
// Detect fail→succeed pairs within outcomes for the same signature.
// Diff the config/models between them; that's the correction.
export async function detectErrorCorrections(): Promise<Array<{
sig_hash: string;
before: RunOutcome;
after: RunOutcome;
models_changed: Record<string, { from: string; to: string }>;
}>> {
const outcomes = await readJsonl<RunOutcome>(OUTCOMES_FILE);
const bySig = new Map<string, RunOutcome[]>();
for (const o of outcomes) {
const arr = bySig.get(o.sig_hash) ?? [];
arr.push(o);
bySig.set(o.sig_hash, arr);
}
const corrections: any[] = [];
for (const [sig, runs] of bySig) {
runs.sort((a, b) => a.created_at.localeCompare(b.created_at));
for (let i = 1; i < runs.length; i++) {
const prev = runs[i - 1];
const cur = runs[i];
if (prev.ok_events < prev.total_events && cur.ok_events > prev.ok_events) {
const changed: Record<string, { from: string; to: string }> = {};
for (const k of ["executor", "reviewer", "overview"] as const) {
if (prev.models[k] !== cur.models[k]) {
changed[k] = { from: prev.models[k], to: cur.models[k] };
}
}
corrections.push({ sig_hash: sig, before: prev, after: cur, models_changed: changed });
}
}
}
return corrections;
}
// Generate a pathway recommendation for a target spec. Reads k-NN
// signatures + their outcome history, asks an overview model to
// synthesize "best path forward", writes to recommendations file.
export async function recommendFor(
spec: any,
opts: { overview_model?: string; cloud?: boolean; k?: number } = {},
): Promise<PathwayRecommendation | null> {
await ensureKb();
const k = opts.k ?? 5;
const overviewModel = opts.overview_model ?? "gpt-oss:20b";
const cloud = opts.cloud ?? false;
const neighbors = await findNeighbors(spec, k);
const corrections = await detectErrorCorrections();
const targetHash = computeSignature(spec);
if (neighbors.length === 0) {
// Cold start — no history. Write a minimal rec so next run still
// gets "we looked" instead of a missing file.
const rec: PathwayRecommendation = {
sig_hash: targetHash,
generated_at: new Date().toISOString(),
generated_by_model: overviewModel,
confidence: "low",
rationale: "No prior runs in KB — first time this signature is seen. Use default model matrix.",
top_models: {},
budget_hints: {},
pathway_notes: "No neighbor history to draw on. Proceed with current config; KB will have data after this run.",
neighbors_consulted: [],
};
await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n");
return rec;
}
// Build the prompt. Include target spec digest, neighbor digests with
// their outcome stats, and recent error corrections. Ask for
// structured output so we can parse it.
const neighborBlock = neighbors.map(n => {
const best = n.outcomes.reduce((a, b) =>
a && a.ok_events / Math.max(1, a.total_events) >= b.ok_events / Math.max(1, b.total_events) ? a : b,
n.outcomes[0]);
const avgOk = n.outcomes.length > 0
? (n.outcomes.reduce((s, o) => s + o.ok_events, 0) / n.outcomes.length).toFixed(1)
: "?";
return `- sig ${n.sig.sig_hash} sim=${n.similarity.toFixed(3)} (${n.sig.events_digest.slice(0, 100)}): ${n.outcomes.length} runs, avg ${avgOk}/${best?.total_events ?? "?"} ok; best models: exec=${best?.models.executor ?? "?"} review=${best?.models.reviewer ?? "?"}`;
}).join("\n");
const correctionBlock = corrections.slice(-3).map(c =>
`- sig ${c.sig_hash}: ${c.before.ok_events}/${c.before.total_events}${c.after.ok_events}/${c.after.total_events}; changed=${JSON.stringify(c.models_changed)}`
).join("\n") || "(no corrections observed yet)";
const prompt = `You are the pathway recommender for a staffing coordinator agent system. A new scenario is about to run. Your job: use history of similar runs to recommend the best configuration.
TARGET SCENARIO:
${specDigest(spec)}
${k} NEAREST-NEIGHBOR SIGNATURES FROM HISTORY (by cosine similarity):
${neighborBlock}
RECENT ERROR CORRECTIONS (fail succeed deltas on same signature):
${correctionBlock}
Your output MUST be a valid JSON object with this shape (and nothing else no prose before or after):
{
"confidence": "high" | "medium" | "low",
"rationale": "2-3 sentence explanation of what pattern you saw",
"top_models": {"executor": "...", "reviewer": "...", "overview": "..."},
"budget_hints": {"executor_max_tokens": 800, "reviewer_max_tokens": 600, "executor_think": false},
"pathway_notes": "concrete pre-run advice — what to pre-fetch, what to avoid, buffer sizes"
}
Respond with ONLY the JSON object.`;
let raw = "";
try {
raw = await generateContinuable(overviewModel, prompt, {
max_tokens: 1200,
shape: "json",
cloud,
max_continuations: 2,
});
} catch (e) {
return null;
}
let parsed: any;
try {
const m = raw.match(/\{[\s\S]*\}/);
parsed = m ? JSON.parse(m[0]) : null;
} catch {
parsed = null;
}
if (!parsed) return null;
const rec: PathwayRecommendation = {
sig_hash: targetHash,
generated_at: new Date().toISOString(),
generated_by_model: overviewModel,
confidence: parsed.confidence ?? "low",
rationale: String(parsed.rationale ?? "").slice(0, 1000),
top_models: parsed.top_models ?? {},
budget_hints: parsed.budget_hints ?? {},
pathway_notes: String(parsed.pathway_notes ?? "").slice(0, 2000),
neighbors_consulted: neighbors.map(n => n.sig.sig_hash),
};
await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n");
return rec;
}
// Read-back at scenario start: find the newest recommendation for this
// exact sig_hash (or nearest neighbor of it). Returns null if nothing
// applicable found.
export async function loadRecommendation(spec: any): Promise<PathwayRecommendation | null> {
await ensureKb();
const targetHash = computeSignature(spec);
const recs = await readJsonl<PathwayRecommendation>(RECOMMENDATIONS_FILE);
const matching = recs.filter(r => r.sig_hash === targetHash);
if (matching.length === 0) return null;
matching.sort((a, b) => b.generated_at.localeCompare(a.generated_at));
return matching[0];
}
// Record a config snapshot. Called whenever models.json or the active
// model assignments change, so error-correction diffs have history to
// reference beyond the outcomes file.
export async function snapshotConfig(
models_json_hash: string,
active_models: Record<string, string>,
why: string,
): Promise<void> {
await ensureKb();
await appendFile(
join(KB_DIR, CONFIG_SNAPSHOTS_FILE),
JSON.stringify({
at: new Date().toISOString(),
models_json_hash,
active_models,
why,
}) + "\n",
);
}

View File

@ -49,8 +49,8 @@ import {
callTool,
} from "./agent.ts";
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const VERIFIER_MODEL = "qwen2.5:latest";
const PROFILE_ID = "staffing-recruiter";
const INDEX_NAME = "workers_500k_v1";
@ -175,7 +175,7 @@ async function buildPhase(task: TaskSpec, prefix: string): Promise<BuildResult>
while (turn < MAX_TURNS && !sealed) {
turn += 1;
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 600 });
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 1200 });
const execAction = parseAction(execRaw, "executor");
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction });
@ -192,7 +192,7 @@ async function buildPhase(task: TaskSpec, prefix: string): Promise<BuildResult>
}
}
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 400 });
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 1000 });
const revAction = parseAction(revRaw, "reviewer");
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction });

View File

@ -25,8 +25,8 @@ import {
import { mkdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const MAX_TURNS = 12; // executor turns; reviewer gets one per
const MAX_CONSECUTIVE_DRIFTS = 3; // drift-cycle blown → give up
@ -148,7 +148,7 @@ async function main() {
// --- EXECUTOR TURN ---
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), {
temperature: 0.2,
max_tokens: 600,
max_tokens: 1200,
});
let execAction: Action;
try {
@ -191,7 +191,7 @@ async function main() {
// --- REVIEWER TURN ---
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), {
temperature: 0.1,
max_tokens: 400,
max_tokens: 1000,
});
let revAction: Action;
try {

View File

@ -32,8 +32,8 @@ import {
callTool,
} from "./agent.ts";
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const MAX_TURNS = 12;
const MAX_CONSECUTIVE_DRIFTS = 3;
const INDEX_NAME = "workers_500k_v1";
@ -75,7 +75,7 @@ async function runOrchestrator(task: TaskSpec, prefix: string): Promise<RunResul
turn += 1;
// Executor
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 600 });
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 1200 });
const execAction = parseAction(execRaw, "executor");
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction });
@ -95,7 +95,7 @@ async function runOrchestrator(task: TaskSpec, prefix: string): Promise<RunResul
}
// Reviewer
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 400 });
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 1000 });
const revAction = parseAction(revRaw, "reviewer");
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction });

View File

@ -29,23 +29,24 @@ import {
sqlQuery,
generate,
generateCloud,
generateContinuable,
parseAction,
executorPrompt,
reviewerPrompt,
GATEWAY,
} from "./agent.ts";
import { indexRun, recommendFor, loadRecommendation, type PathwayRecommendation } from "./kb.ts";
import { mkdir, writeFile, appendFile } from "node:fs/promises";
import { join } from "node:path";
// 2026-04-20 — reverted to mistral executor after trying qwen2.5.
// qwen2.5 emits malformed JSON (trailing `)` garbage, unterminated
// strings) when asked for tool calls. mistral drops fields occasionally
// but produces valid JSON. With optional `question` default + lean
// prompt + schema lock, mistral seals baseline + recurring reliably.
// Complex scenarios (5-fill, emergency, misplacement) remain flaky —
// real Phase 20+ problem (larger model or constrained decoding needed).
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
// 2026-04-21 — executor is now qwen3.5:latest (9.7B, 262K context,
// thinking model, emits clean JSON). Replaces mistral, which produced
// malformed JSON on complex SQL filters (bare IN-clause identifiers,
// unclosed braces) regardless of prompt — decoder-level bug that all
// 5 events hit across 4 A/B test runs. qwen3.5 tested clean on first
// try with 800 max_tokens.
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const DRAFT_MODEL = "qwen2.5:latest"; // artifact generation; short outputs
// T3 overview tier. Called sparingly — NOT per tool call. Two insertion
@ -81,9 +82,16 @@ const T3_DISABLED = process.env.LH_T3_DISABLE === "1";
// Dispatcher: route T3 calls to local sidecar or Ollama Cloud depending
// on the LH_OVERVIEW_CLOUD flag. Hot-path T1/T2 always stay local.
// T3 outputs are free-form prose (lesson/hint), so shape=text — the
// continuation primitive treats any non-empty response as complete.
async function overviewGenerate(prompt: string, opts: { temperature?: number; max_tokens?: number } = {}): Promise<string> {
if (OVERVIEW_CLOUD) return generateCloud(OVERVIEW_MODEL, prompt, opts);
return generate(OVERVIEW_MODEL, prompt, opts);
return generateContinuable(OVERVIEW_MODEL, prompt, {
temperature: opts.temperature,
max_tokens: opts.max_tokens ?? 1000,
shape: "text",
max_continuations: 2,
cloud: OVERVIEW_CLOUD,
});
}
const MAX_TURNS = 14;
@ -147,6 +155,7 @@ interface ScenarioContext {
results: EventResult[];
gap_signals: Array<{ event: string; category: string; detail: string }>;
prior_lessons: PriorLesson[];
pathway_rec?: PathwayRecommendation | null;
}
interface PriorLesson {
@ -365,10 +374,26 @@ async function runAgentFill(
while (turn < MAX_TURNS && !sealed) {
turn += 1;
const execRaw = await generate(
// generateContinuable: if the model truncates mid-JSON (thinking
// ate the budget, or payload was just long), auto-continue with the
// partial as scratchpad until braces balance and JSON parses.
// No more "bump max_tokens until it stops truncating" tourniquet.
// think:false — executor emits structured JSON, doesn't need hidden
// reasoning. Burning ~650 thinking tokens on a 400-token JSON was
// exactly the bug we just solved.
const execRaw = await generateContinuable(
EXECUTOR_MODEL,
withExtras(executorPrompt(task, log)),
{ temperature: 0.2, max_tokens: 600 },
{
temperature: 0.2,
max_tokens: 800,
shape: "json",
max_continuations: 3,
think: false,
on_continuation: (n, len) =>
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "note",
content: { continuation: n, combined_chars: len } }),
},
);
let execAction: Action;
try {
@ -419,10 +444,19 @@ async function runAgentFill(
}
}
const revRaw = await generate(
const revRaw = await generateContinuable(
REVIEWER_MODEL,
withExtras(reviewerPrompt(task, log)),
{ temperature: 0.1, max_tokens: 400 },
{
temperature: 0.1,
max_tokens: 600,
shape: "json",
max_continuations: 3,
think: false,
on_continuation: (n, len) =>
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "note",
content: { continuation: n, combined_chars: len } }),
},
);
let revAction: Action;
try {
@ -548,7 +582,14 @@ CAST(reliability AS DOUBLE) > 0.7.`;
).join("\n")
: "";
return `${schemaLock}\n\nEVENT FOCUS:\n${base}${priorHint}`;
// Phase 22 pathway recommendation — if the KB synthesized a "best
// path" from neighbor runs, inject it as concrete pre-run guidance.
// Keep terse; the full rationale lives in the KB file.
const pathwayHint = ctx.pathway_rec && ctx.pathway_rec.pathway_notes
? `\n\nKB PATHWAY RECOMMENDATION (synthesized from ${ctx.pathway_rec.neighbors_consulted.length} neighbor runs, confidence=${ctx.pathway_rec.confidence}):\n${ctx.pathway_rec.pathway_notes.slice(0, 600)}`
: "";
return `${schemaLock}\n\nEVENT FOCUS:\n${base}${priorHint}${pathwayHint}`;
}
// =================== Artifact generation ===================
@ -867,7 +908,10 @@ HINT: <hint>`;
let text = "";
try {
text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 600 });
// overviewGenerate routes through generateContinuable — if thinking
// ate the initial budget, it auto-continues rather than requiring
// us to guess a safe max_tokens upfront.
text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 800 });
} catch (e) {
return {
after_event: event.at,
@ -1066,6 +1110,7 @@ async function writeRetrospective(ctx: ScenarioContext): Promise<void> {
// =================== Main driver ===================
async function main() {
const runStart = Date.now();
const specPath = process.argv[2];
const spec: ScenarioSpec = specPath
? JSON.parse(await Bun.file(specPath).text())
@ -1077,6 +1122,18 @@ async function main() {
const prior_lessons = await loadPriorLessons(spec);
// Phase 22 KB — load any pathway recommendation for this signature.
// The recommender is called at END of prior runs and synthesizes
// configuration + pathway notes from nearest-neighbor history.
// Nothing on first run (cold start); populates over time.
const pathwayRec = await loadRecommendation(spec).catch(() => null);
if (pathwayRec) {
console.log(`▶ KB recommendation loaded: confidence=${pathwayRec.confidence} from ${pathwayRec.neighbors_consulted.length} neighbors`);
if (pathwayRec.pathway_notes) {
console.log(` pathway: ${pathwayRec.pathway_notes.slice(0, 120)}${pathwayRec.pathway_notes.length > 120 ? "…" : ""}`);
}
}
const ctx: ScenarioContext = {
spec,
out_dir,
@ -1084,6 +1141,7 @@ async function main() {
results: [],
gap_signals: [],
prior_lessons,
pathway_rec: pathwayRec,
};
// Initialize output files
@ -1219,6 +1277,30 @@ async function main() {
await writeRetrospective(ctx);
// Phase 22 KB — index this run + synthesize recommendation for next
// time this signature (or similar ones) show up. Event-driven cycle:
// run ends → KB updates → next run reads rec at startup.
try {
const elapsed = (Date.now() - runStart) / 1000;
const { sig_hash } = await indexRun(out_dir, spec, {
executor: EXECUTOR_MODEL,
reviewer: REVIEWER_MODEL,
overview: OVERVIEW_MODEL,
overview_cloud: OVERVIEW_CLOUD,
}, elapsed);
console.log(`▶ KB indexed: sig=${sig_hash} (${elapsed.toFixed(1)}s)`);
const newRec = await recommendFor(spec, {
overview_model: OVERVIEW_MODEL,
cloud: OVERVIEW_CLOUD,
k: 5,
});
if (newRec) {
console.log(`▶ KB recommendation written: confidence=${newRec.confidence} (${newRec.neighbors_consulted.length} neighbors consulted)`);
}
} catch (e) {
console.log(` (KB update skipped: ${(e as Error).message})`);
}
const okCount = ctx.results.filter(r => r.ok).length;
if (okCount < ctx.results.length) {
console.log(`\n⚠ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md for gaps.`);