root 5d49967833 multi_coord_stress: full Langfuse coverage — every phase + every call
Phase 1c-only tracing (commit 7e6431e) was the proof-of-concept.
This commit threads tracing through every phase: baseline / fresh-
resume / inbox burst / surge / swap / merge / handover (verbatim +
paraphrase) / split / reissue. Each phase is a parent span; each
matrix.search / LLM call inside is a child span.

Refactor:
- One run-level trace is created at driver startup.
- New startPhase(name, hour, meta) helper emits a phase span as a
  child of the run trace; subsequent emitSpan calls nest under it.
- New tracedSearch(spanName, query, corpora, ...) wraps matrixSearch
  with span emission. Every search call site replaced with this so
  the input/output JSON (query, corpora, k, playbook, exclude_n →
  top-K ids, top1 distance, boost/inject counts) lands in Langfuse.
- Phase 4b's paraphrase generation also emits llm.paraphrase spans.
- Phase 1c's existing inline span emission converted to use the new
  helpers (no more inboxTraceID variable).

Run #011 result: trace landed at http://localhost:3001 with 111
observations attached. Span breakdown:
  phase.* parents:         9 (one per phase that ran)
  matrix.search.baseline:  10
  matrix.search.fresh_verify: 3 (top-1 confirmed for all 3 fresh)
  observerd.inbox.record:  6
  llm.parse_demand:        6
  matrix.search.inbox:     6
  llm.judge_top1:          6
  matrix.search.surge:     12
  matrix.search.swap_orig: 1
  matrix.search.swap_replace: 1
  matrix.search.merge:     6
  matrix.search.handover_verbatim: 4
  llm.paraphrase:          4
  matrix.search.handover_paraphrase: 4
  matrix.search.split:     4
  matrix.search.reissue:   12
  matrix.search.reissue_retrieval_only: 12
  ─────────────
  Total:                   111

Browse: http://localhost:3001 → Traces → "multi_coord_stress run"
Each phase is a collapsible section showing per-call timing and
input/output JSON. Operators can drill into any single retrieval
to see exactly what query was issued and what came back.

All other metrics held: diversity 0.026, determinism 1.000,
verbatim handover 4/4, paraphrase handover 4/4, fresh-resume 3/3
at top-1 (two-tier index), 200-worker swap Jaccard 0.000.

This is the FULL TEST J asked for — every action in the run
visible in Langfuse, full input/output drilldown.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:43:32 -05:00

1398 lines
53 KiB
Go

// Multi-coordinator stress harness — Phase 1 of the 48-hour mock.
//
// Three coordinators (Alice, Bob, Carol) each own a contract with a
// different demand profile. They run queries against the matrix
// indexer with separate playbook namespaces. The harness fires
// scenario phases (baseline → surge → merge → handover → split) and
// captures every response so we can verify:
//
// 1. Diversity — different (coord, contract, role) triples should
// surface DIFFERENT top-K worker IDs. If everything returns the
// same handful of workers, the system is "cycling" not "locking
// into scenarios."
// 2. Non-determinism — same query reissued should return near-
// identical top-K (controlled variance from HNSW + judge, if any).
// 3. Learning — after Alice records playbook entries for her
// contract's queries, Bob takes over the same contract using
// Alice's playbook namespace; Alice's recordings should surface
// in Bob's results.
//
// Phase 1 deliberately skips: time-based event clock (events fire
// sequentially), email/SMS ingest (no integration yet), Langfuse
// tracing (would need Go-side wiring). Those are Phase 2/3.
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
)
// ── data shapes ──────────────────────────────────────────────────
type Demand struct {
Role string `json:"role"`
Count int `json:"count"`
Skills []string `json:"skills"`
Certs []string `json:"certs"`
InRoster *bool `json:"in_roster,omitempty"` // nil = assume true
}
type Contract struct {
Name string `json:"name"`
Client string `json:"client"`
Location string `json:"location"`
Shift string `json:"shift"`
Demand []Demand `json:"demand"`
}
type Coordinator struct {
Name string
PlaybookCorpus string
}
// ── matrix.search wire shapes ────────────────────────────────────
type matrixSearchReq struct {
QueryText string `json:"query_text"`
Corpora []string `json:"corpora"`
K int `json:"k"`
UsePlaybook bool `json:"use_playbook,omitempty"`
PlaybookCorpus string `json:"playbook_corpus,omitempty"`
ExcludeIDs []string `json:"exclude_ids,omitempty"`
}
type matrixResult struct {
ID string `json:"id"`
Distance float32 `json:"distance"`
Corpus string `json:"corpus"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
type matrixResp struct {
Results []matrixResult `json:"results"`
PerCorpusCounts map[string]int `json:"per_corpus_counts"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
PlaybookInjected int `json:"playbook_injected,omitempty"`
}
// ── event capture ────────────────────────────────────────────────
type ResultRef struct {
Rank int `json:"rank"`
ID string `json:"id"`
Corpus string `json:"corpus"`
Distance float32 `json:"distance"`
}
type Event struct {
Phase string `json:"phase"`
Hour int `json:"hour"` // operational-narrative time label, not real wall clock
Coordinator string `json:"coordinator"`
Contract string `json:"contract"`
Role string `json:"role"`
Query string `json:"query"`
SurgeMultiplier int `json:"surge_multiplier,omitempty"`
UsePlaybook bool `json:"use_playbook"`
PlaybookCorpus string `json:"playbook_corpus,omitempty"`
ExcludeIDs []string `json:"exclude_ids,omitempty"`
TopK []ResultRef `json:"top_k"`
PerCorpusCounts map[string]int `json:"per_corpus_counts,omitempty"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
PlaybookInjected int `json:"playbook_injected,omitempty"`
// JudgeRating: 1-5 quality score on top-1 result against the
// original inbox body (not the LLM-parsed query). Lets us flag
// the case where LLM parsing produces a tight-distance match
// but the result doesn't actually fit the original ask.
// 0 = unrated, 1-5 = judge verdict.
JudgeRating int `json:"judge_rating,omitempty"`
Note string `json:"note,omitempty"`
TimestampUnixNano int64 `json:"ts_ns"`
}
type Output struct {
Coordinators []string `json:"coordinators"`
Contracts []string `json:"contracts"`
Events []Event `json:"events"`
Diversity Diversity `json:"diversity"`
Determinism Determ `json:"determinism"`
Learning Learning `json:"learning"`
GeneratedAt time.Time `json:"generated_at"`
}
// Diversity = how distinct are top-K worker sets across (coord,
// contract, role) triples that SHOULD differ. We compute mean Jaccard
// similarity for matched-role-across-contracts pairs (lower is more
// diverse) and matched-coord-different-roles pairs.
type Diversity struct {
SameRoleAcrossContractsMeanJaccard float64 `json:"same_role_across_contracts_mean_jaccard"`
DifferentRolesSameContractMeanJaccard float64 `json:"different_roles_same_contract_mean_jaccard"`
NumPairsSameRoleAcrossContracts int `json:"num_pairs_same_role_across_contracts"`
NumPairsDifferentRolesSameContract int `json:"num_pairs_different_roles_same_contract"`
}
// Determ = same query reissued — top-K should be near-identical.
// Jaccard close to 1.0 = stable / deterministic, < 0.95 = some HNSW
// or judge variance.
type Determ struct {
MeanJaccard float64 `json:"mean_jaccard"`
NumReissuedPairs int `json:"num_reissued_pairs"`
}
// Learning = handover signal. After Alice records playbooks for her
// contract, Bob runs the same queries with Alice's playbook namespace.
// We measure: do Alice's recorded answer IDs surface in Bob's top-K?
//
// Two modes:
// - Verbatim handover: Bob runs Alice's exact queries (trivial case).
// - Paraphrase handover: Bob runs paraphrased queries against Alice's
// playbook (the hard case — does cosine on paraphrase find the
// recorded query's vector?). This is the multi-coord analog of the
// paraphrase reality test in playbook_lift.
type Learning struct {
HandoverQueriesRun int `json:"handover_queries_run"`
RecordedAnswersTop1Count int `json:"recorded_answers_top1_count"`
RecordedAnswersTopKCount int `json:"recorded_answers_topk_count"`
HandoverHitRate float64 `json:"handover_hit_rate"`
// Paraphrase handover — only populated when --with-paraphrase-handover.
ParaphraseHandoverRun int `json:"paraphrase_handover_run,omitempty"`
ParaphraseTop1Count int `json:"paraphrase_top1_count,omitempty"`
ParaphraseTopKCount int `json:"paraphrase_topk_count,omitempty"`
ParaphraseHandoverHitRate float64 `json:"paraphrase_handover_hit_rate,omitempty"`
}
// ── main ─────────────────────────────────────────────────────────
func main() {
var (
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
contractsDir = flag.String("contracts", "tests/reality/contracts", "directory of contract JSON files")
corporaCSV = flag.String("corpora", "workers,ethereal_workers", "comma-separated matrix corpora")
k = flag.Int("k", 8, "top-k from matrix.search per query")
out = flag.String("out", "reports/reality-tests/multi_coord_stress_001.json", "output JSON path")
ollama = flag.String("ollama", "http://127.0.0.1:11434", "Ollama base URL (only used if --with-paraphrase-handover)")
judgeModel = flag.String("judge", "qwen2.5:latest", "Ollama model for paraphrase generation (only used if --with-paraphrase-handover)")
withParaphraseHandover = flag.Bool("with-paraphrase-handover", false, "after the verbatim handover phase, run a paraphrase handover phase: Bob runs paraphrased versions of Alice's queries against Alice's playbook")
langfuseEnv = flag.String("langfuse-env", "/etc/lakehouse/langfuse.env", "path to Langfuse credentials env file (empty = skip tracing)")
)
flag.Parse()
contracts, err := loadContracts(*contractsDir)
if err != nil {
log.Fatalf("load contracts: %v", err)
}
if len(contracts) < 3 {
log.Fatalf("need ≥3 contracts in %s, got %d", *contractsDir, len(contracts))
}
// First three contracts → coord assignments. Names are fixed so
// playbook corpora are stable across runs (rerun lands on same
// namespaces, exercising the persistence path indirectly).
coords := []Coordinator{
{Name: "alice", PlaybookCorpus: "playbook_alice"},
{Name: "bob", PlaybookCorpus: "playbook_bob"},
{Name: "carol", PlaybookCorpus: "playbook_carol"},
}
// Initial assignment: alice→alpha, bob→beta, carol→gamma.
assignments := map[string]*Contract{
"alice": &contracts[0],
"bob": &contracts[1],
"carol": &contracts[2],
}
corpora := strings.Split(*corporaCSV, ",")
hc := &http.Client{Timeout: 30 * time.Second}
ctx := context.Background()
_ = ctx
// Optional Langfuse client. Best-effort: missing env file or
// unreachable Langfuse just means traces don't go anywhere; the
// run still proceeds.
var lf *langfuse.Client
var runTraceID string
var currentPhaseSpanID string
if *langfuseEnv != "" {
if creds, err := loadLangfuseEnv(*langfuseEnv); err == nil {
lf = langfuse.New(creds.URL, creds.PublicKey, creds.SecretKey, nil)
log.Printf("[stress] Langfuse client live → %s", creds.URL)
runTraceID = lf.Trace(ctx, langfuse.TraceInput{
Name: "multi_coord_stress run",
Tags: []string{"stress", "multi-coord"},
Metadata: map[string]any{
"corpora": corpora,
"k": *k,
},
})
defer func() {
if err := lf.Flush(context.Background()); err != nil {
log.Printf("[stress] Langfuse final flush: %v", err)
}
}()
} else {
log.Printf("[stress] Langfuse skipped: %v", err)
}
}
// startPhase begins a new phase span (child of the run trace).
// Subsequent emitSpan calls nest under it. Idempotent — returns
// "" when Langfuse isn't configured so callers don't need nil
// checks.
startPhase := func(name string, hour int, meta map[string]any) {
if lf == nil {
return
}
spanMeta := map[string]any{"hour": hour}
for k, v := range meta {
spanMeta[k] = v
}
currentPhaseSpanID = lf.Span(ctx, langfuse.SpanInput{
TraceID: runTraceID,
Name: name,
Metadata: spanMeta,
StartTime: time.Now(),
})
}
// emitSpan records one span as a child of the current phase span.
// Always pair with a matching `defer` style call so durations are
// real (not 0).
emitSpan := func(name string, start time.Time, input, output any, level string) {
if lf == nil {
return
}
lf.Span(ctx, langfuse.SpanInput{
TraceID: runTraceID,
ParentID: currentPhaseSpanID,
Name: name,
Input: input,
Output: output,
StartTime: start,
EndTime: time.Now(),
Level: level,
})
}
// tracedSearch wraps matrixSearch with span emission. Every
// search-call-site in the phases below uses this so Langfuse
// captures every retrieval with its inputs (query, playbook,
// excludes) and outputs (top-K ids, top-1 distance, boost/inject
// counts). Caller still must() if they want the fail-fast behavior;
// errors here are emitted as ERROR spans + propagated.
tracedSearch := func(spanName, query string, searchCorpora []string, usePlaybook bool, pbCorpus string, excludeIDs ...string) *matrixResp {
start := time.Now()
resp, err := matrixSearch(hc, *gateway, query, searchCorpora, *k, usePlaybook, pbCorpus, excludeIDs...)
if err != nil {
emitSpan(spanName, start,
map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"error": err.Error()}, "ERROR")
log.Fatalf("[stress] %v", err)
}
topIDs := make([]string, 0, len(resp.Results))
for _, r := range resp.Results {
topIDs = append(topIDs, r.ID)
}
emitSpan(spanName, start,
map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"top_k_ids": topIDs, "top1_distance": firstDistance(resp.Results), "playbook_boosted": resp.PlaybookBoosted, "playbook_injected": resp.PlaybookInjected}, "")
return resp
}
output := Output{
Coordinators: []string{"alice", "bob", "carol"},
Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name},
GeneratedAt: time.Now().UTC(),
}
log.Printf("[stress] 3 coords, 3 contracts, k=%d, corpora=%v", *k, corpora)
// ── Phase 1: baseline ───────────────────────────────────────
// Each coord runs their own contract's role queries. Records
// playbook entries (top-1 of each as a synthetic "successful
// match" outcome) into their personal namespace.
log.Printf("[stress] phase 1: baseline")
startPhase("phase.baseline", 0, nil)
for _, coord := range coords {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 1)
resp := tracedSearch("matrix.search.baseline", q, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("baseline", 0, coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
// Record top-1 as a successful playbook entry for this coord.
if len(resp.Results) > 0 {
if err := playbookRecord(hc, *gateway, q, resp.Results[0].ID, resp.Results[0].Corpus, 1.0, coord.PlaybookCorpus); err != nil {
log.Printf(" record (%s/%s): %v", coord.Name, d.Role, err)
}
}
}
}
// ── Phase 1b: new-resume injection (Hour 6) ─────────────────
// Mid-day, three new resumes arrive — workers with no prior
// history. We embed + add them to the workers vectord index,
// then verify they're findable by their unique skill marker.
// Tests the substrate's ability to absorb fresh candidates
// without restart.
log.Printf("[stress] phase 1b: new-resume injection (3 fresh workers, verify findable)")
startPhase("phase.new_resume_injection", 6, nil)
// Each fresh worker has a SEMANTIC query that should surface them
// based on the actual content of their resume — role + skills +
// location. nomic-embed-text is dense/semantic, NOT lexical, so a
// "find me FRESHTAG_..." style unique-substring query does NOT
// surface the fresh worker; the embedder weights rare substrings
// as low-information noise. The semantic query below represents
// what a real coordinator would actually issue.
freshWorkers := []struct {
ID string
Resume string
Verify string // semantic query expected to surface this worker
}{
{
ID: "fresh-001",
Resume: "Senior rigger with 12 years tower-crane signaling experience. NCCCO crane signal/rigger certification active. Chicago IL metro, available immediately. Construction-site rigging specialist.",
Verify: "Senior tower crane rigger NCCCO certification Chicago construction signaling",
},
{
ID: "fresh-002",
Resume: "Bilingual safety coordinator (Spanish + English). OSHA trainer credentials, 8 years manufacturing safety training delivery. Indianapolis IN. Manages multilingual crew safety briefings and incident documentation.",
Verify: "Bilingual Spanish English OSHA trainer safety coordinator Indianapolis manufacturing",
},
{
ID: "fresh-003",
Resume: "FAA Part 107 certified drone pilot. UAV site surveying with GIS mapping output for construction site progress reports. Chicago IL metro. 5 years aerial surveying for general contractors.",
Verify: "FAA Part 107 drone surveyor UAV pilot GIS construction site mapping Chicago",
},
}
const freshIdx = "fresh_workers"
if err := ensureFreshIndex(hc, *gateway, freshIdx, 768); err != nil {
log.Fatalf("ensure fresh_workers index: %v", err)
}
for _, fw := range freshWorkers {
if err := ingestFreshWorker(hc, *gateway, freshIdx, fw.ID, fw.Resume, map[string]any{
"name": fw.ID,
"role": "fresh-resume",
"source": "phase-1b-injection",
}); err != nil {
log.Fatalf("ingest fresh worker %s: %v", fw.ID, err)
}
}
// Verify queries search across main + fresh corpora — the small
// fresh corpus should surface the freshly-added worker because
// it has no recall competition there.
verifyCorpora := append([]string{}, corpora...)
verifyCorpora = append(verifyCorpora, freshIdx)
for _, fw := range freshWorkers {
resp := tracedSearch("matrix.search.fresh_verify", fw.Verify, verifyCorpora, false, "")
ev := captureEvent("new-resume-verify", 6, "system", "fresh-resume-pool", "fresh", fw.Verify, 1, false, "", resp)
// Find the fresh worker's rank in top-K (rank 0 = top-1).
freshRank := -1
for i, r := range resp.Results {
if r.ID == fw.ID {
freshRank = i
break
}
}
switch {
case freshRank == 0:
ev.Note = fmt.Sprintf("fresh worker %s at top-1 — semantic absorption working", fw.ID)
case freshRank > 0:
ev.Note = fmt.Sprintf("fresh worker %s at rank %d (in top-K but not top-1)", fw.ID, freshRank)
default:
ev.Note = fmt.Sprintf("fresh worker %s NOT in top-K (top-1 was %s) — embedder didn't surface fresh-resume content over existing population", fw.ID, resp.Results[0].ID)
}
output.Events = append(output.Events, ev)
}
// ── Phase 1c: inbox burst (Hour 9) ──────────────────────────
// Mid-morning, 6 incoming signals arrive — emails + SMS — each
// carrying a structured demand for the system to act on. Events
// fire in PRIORITY ORDER (urgent → high → medium). For each, we:
// 1. POST to /v1/observer/inbox so the witness loop records it
// 2. Run matrix.search using the embedded demand
// 3. Capture both as events
//
// Priority weighting matters because real coordinators triage
// urgent client-side asks before medium-priority background
// signals. The substrate doesn't enforce ordering today (callers
// fire in their preferred order); this phase verifies the
// recording surface and the search-from-inbox flow work.
log.Printf("[stress] phase 1c: inbox burst (6 events, priority-ordered)")
startPhase("phase.inbox_burst", 9, map[string]any{"event_count": 6})
type inboxEvent struct {
Priority string // "urgent" | "high" | "medium" | "low"
Type string // "email" | "sms"
Sender string
Subject string
Body string
Coord string
}
inboxEvents := []inboxEvent{
{
Priority: "urgent", Type: "email", Sender: "ops@northstar.com",
Subject: "URGENT: 50 forklift operators Cleveland Monday",
Body: "Need 50 forklift operators in Cleveland OH for Monday day shift. OSHA-30 + active forklift cert required. Current Milwaukee batch cannot relocate.",
Coord: "alice",
},
{
Priority: "urgent", Type: "email", Sender: "client@crossroads-mfg.com",
Subject: "URGENT: Production line down — need 30 production workers tonight",
Body: "Production line failure at Indianapolis IN site. Need 30 production workers swing shift starting tonight. Assembly + machine operation experience required.",
Coord: "bob",
},
{
Priority: "high", Type: "email", Sender: "supervisor@loop-construction.com",
Subject: "Need crane operator Chicago for 2-week project",
Body: "Crane operator with NCCCO certification needed for 2-week Chicago IL site project. Day shift. Mobile crane experience preferred.",
Coord: "carol",
},
{
Priority: "high", Type: "sms", Sender: "+1-555-0142",
Body: "Bilingual safety coord needed Indy plant ASAP. Spanish + English. OSHA trainer credential.",
Coord: "bob",
},
{
Priority: "medium", Type: "sms", Sender: "+1-555-0188",
Body: "Drone surveyor for Chicago site progress mapping. FAA Part 107.",
Coord: "carol",
},
{
Priority: "medium", Type: "email", Sender: "scheduling@northstar.com",
Subject: "FYI: warehouse worker capacity check Milwaukee",
Body: "Routine capacity check on Milwaukee warehouse worker pool — anyone with cold storage experience for next week?",
Coord: "alice",
},
}
// Sort by priority (urgent < high < medium < low for ordering).
prioRank := map[string]int{"urgent": 0, "high": 1, "medium": 2, "low": 3}
sort.SliceStable(inboxEvents, func(i, j int) bool {
return prioRank[inboxEvents[i].Priority] < prioRank[inboxEvents[j].Priority]
})
for _, ie := range inboxEvents {
// 1. Record inbox event at observerd
stepStart := time.Now()
if err := postInbox(hc, *gateway, ie.Type, ie.Sender, ie.Subject, ie.Body, ie.Priority, ie.Coord); err != nil {
log.Printf(" inbox record failed (%s): %v", ie.Priority, err)
continue
}
emitSpan("observerd.inbox.record", stepStart,
map[string]any{"type": ie.Type, "sender": ie.Sender, "priority": ie.Priority, "subject": ie.Subject, "body_chars": len(ie.Body), "coordinator": ie.Coord},
map[string]any{"accepted": true}, "")
// 2. LLM parses the body into a structured demand.
parseStart := time.Now()
parsed, perr := parseInboxDemand(hc, *ollama, *judgeModel, ie.Body)
if perr != nil {
emitSpan("llm.parse_demand", parseStart,
map[string]any{"body": ie.Body, "model": *judgeModel},
map[string]any{"error": perr.Error()}, "ERROR")
log.Printf(" inbox demand parse failed (%s): %v", ie.Priority, perr)
continue
}
emitSpan("llm.parse_demand", parseStart,
map[string]any{"body": ie.Body, "model": *judgeModel},
parsed, "")
// 3. Build a query string from the parsed demand and search.
query := parsed.AsQuery()
coord := coordByName(coords, ie.Coord)
searchStart := time.Now()
resp, err := matrixSearch(hc, *gateway, query, corpora, *k, true, coord.PlaybookCorpus)
if err != nil {
emitSpan("matrix.search.inbox", searchStart,
map[string]any{"query": query, "corpora": corpora, "k": *k},
map[string]any{"error": err.Error()}, "ERROR")
log.Printf(" inbox-triggered search failed (%s): %v", ie.Priority, err)
continue
}
topIDs := make([]string, 0, len(resp.Results))
for _, r := range resp.Results {
topIDs = append(topIDs, r.ID)
}
emitSpan("matrix.search.inbox", searchStart,
map[string]any{"query": query, "corpora": corpora, "k": *k, "playbook_corpus": coord.PlaybookCorpus},
map[string]any{"top_k_ids": topIDs, "top1_distance": firstDistance(resp.Results), "playbook_boosted": resp.PlaybookBoosted, "playbook_injected": resp.PlaybookInjected}, "")
ev := captureEvent("inbox-triggered-search", 9, ie.Coord, "inbox-burst", ie.Subject, query, 1, true, coord.PlaybookCorpus, resp)
parsedJSON, _ := json.Marshal(parsed)
ev.Note = fmt.Sprintf("inbox %s/%s from %s · LLM-parsed demand: %s", ie.Type, ie.Priority, ie.Sender, string(parsedJSON))
// 4. Judge re-rates top-1 against the ORIGINAL body.
if len(resp.Results) > 0 {
judgeStart := time.Now()
rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0])
ev.JudgeRating = rating
emitSpan("llm.judge_top1", judgeStart,
map[string]any{"original_body": ie.Body, "top1_id": resp.Results[0].ID, "top1_corpus": resp.Results[0].Corpus},
map[string]any{"rating": rating}, "")
}
output.Events = append(output.Events, ev)
}
// ── Phase 2: surge ──────────────────────────────────────────
// Each coord's contract demand doubles. URGENT phrasing.
log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)")
startPhase("phase.surge", 12, nil)
for _, coord := range coords {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 2)
resp := tracedSearch("matrix.search.surge", q, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("surge", 12, coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
}
// ── Phase 2b: 200-worker swap (Hour 18) ──────────────────────
// Alpha's client says "the 200 workers you placed are unavailable
// — find replacements." We capture the top-K from the warehouse
// query, then re-issue the same query with those IDs excluded.
// Real product test: does the system find genuinely different
// candidates, or does it sit on the same population?
log.Printf("[stress] phase 2b: 200-worker swap (alpha warehouse — exclude originally placed)")
startPhase("phase.swap_200_workers", 18, nil)
warehouseDemand := contracts[0].Demand[0] // slot 0 is warehouse worker by contract design
swapQuery := buildQuery(&contracts[0], warehouseDemand, 1)
origResp := tracedSearch("matrix.search.swap_orig", swapQuery, corpora, false, "")
placedIDs := make([]string, 0, len(origResp.Results))
for _, r := range origResp.Results {
placedIDs = append(placedIDs, r.ID)
}
origEv := captureEvent("swap-original", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", origResp)
origEv.Note = fmt.Sprintf("captured %d originally-placed worker IDs", len(placedIDs))
output.Events = append(output.Events, origEv)
swapResp := tracedSearch("matrix.search.swap_replace", swapQuery, corpora, false, "", placedIDs...)
swapEv := captureEvent("swap-replace", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", swapResp)
swapEv.ExcludeIDs = placedIDs
swapIDs := make([]string, 0, len(swapResp.Results))
for _, r := range swapResp.Results {
swapIDs = append(swapIDs, r.ID)
}
swapJacc := jaccardStrings(placedIDs, swapIDs)
swapEv.Note = fmt.Sprintf("Jaccard(orig, swap) = %.3f (lower = better; 0 = fully replaced)", swapJacc)
output.Events = append(output.Events, swapEv)
// ── Phase 3: merge — alpha + beta combined under alice ──────
log.Printf("[stress] phase 3: merge (alpha + beta combined, alice handles)")
startPhase("phase.merge", 24, nil)
mergedDemand := append(append([]Demand{}, contracts[0].Demand...), contracts[1].Demand...)
for _, d := range mergedDemand {
mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"}
q := buildQuery(mergedC, d, 1)
resp := tracedSearch("matrix.search.merge", q, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("merge", 24, "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
// ── Phase 4: handover — bob takes alpha contract, USING ─────
// alice's playbook namespace. Tests whether Alice's recordings
// surface in Bob's results when Bob runs Alice's contract.
log.Printf("[stress] phase 4: handover (bob takes alpha, using alice's playbook)")
startPhase("phase.handover_verbatim", 30, nil)
aliceRecordedAnswers := map[string]string{} // role → recorded answer id
for _, ev := range output.Events {
if ev.Phase == "baseline" && ev.Coordinator == "alice" && len(ev.TopK) > 0 {
aliceRecordedAnswers[ev.Role] = ev.TopK[0].ID
}
}
handoverHitsTop1 := 0
handoverHitsTopK := 0
handoverRun := 0
for _, d := range contracts[0].Demand {
q := buildQuery(&contracts[0], d, 1)
resp := tracedSearch("matrix.search.handover_verbatim", q, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("handover", 30, "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
handoverRun++
recordedID, ok := aliceRecordedAnswers[d.Role]
if !ok {
continue
}
if len(ev.TopK) > 0 && ev.TopK[0].ID == recordedID {
handoverHitsTop1++
handoverHitsTopK++
} else {
for _, r := range ev.TopK {
if r.ID == recordedID {
handoverHitsTopK++
break
}
}
}
}
output.Learning.HandoverQueriesRun = handoverRun
output.Learning.RecordedAnswersTop1Count = handoverHitsTop1
output.Learning.RecordedAnswersTopKCount = handoverHitsTopK
if handoverRun > 0 {
output.Learning.HandoverHitRate = float64(handoverHitsTop1) / float64(handoverRun)
}
// ── Phase 4b: paraphrase handover ───────────────────────────
// Bob runs PARAPHRASED versions of Alice's queries against
// Alice's playbook. The verbatim handover above is the trivial
// case (identical queries → identical retrieval → playbook
// boost). The paraphrase handover is the real test: did Alice's
// institutional memory survive the wording change Bob would
// naturally introduce?
if *withParaphraseHandover {
log.Printf("[stress] phase 4b: paraphrase handover (bob runs paraphrased versions of alice's queries)")
startPhase("phase.handover_paraphrase", 36, nil)
pHandoverRun := 0
pTop1 := 0
pTopK := 0
for _, d := range contracts[0].Demand {
origQuery := buildQuery(&contracts[0], d, 1)
paraStart := time.Now()
paraphrase, err := generateParaphrase(hc, *ollama, *judgeModel, origQuery)
if err != nil {
emitSpan("llm.paraphrase", paraStart,
map[string]any{"original": origQuery, "model": *judgeModel},
map[string]any{"error": err.Error()}, "ERROR")
log.Printf(" paraphrase gen failed for %s: %v", d.Role, err)
continue
}
emitSpan("llm.paraphrase", paraStart,
map[string]any{"original": origQuery, "model": *judgeModel},
map[string]any{"paraphrase": paraphrase}, "")
resp := tracedSearch("matrix.search.handover_paraphrase", paraphrase, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("handover-paraphrase", 36, "bob", contracts[0].Name, d.Role, paraphrase, 1, true, coords[0].PlaybookCorpus, resp)
ev.Note = "paraphrase of: " + origQuery
output.Events = append(output.Events, ev)
pHandoverRun++
recordedID, ok := aliceRecordedAnswers[d.Role]
if !ok {
continue
}
if len(ev.TopK) > 0 && ev.TopK[0].ID == recordedID {
pTop1++
pTopK++
} else {
for _, r := range ev.TopK {
if r.ID == recordedID {
pTopK++
break
}
}
}
}
output.Learning.ParaphraseHandoverRun = pHandoverRun
output.Learning.ParaphraseTop1Count = pTop1
output.Learning.ParaphraseTopKCount = pTopK
if pHandoverRun > 0 {
output.Learning.ParaphraseHandoverHitRate = float64(pTop1) / float64(pHandoverRun)
}
}
// ── Phase 5: split — surge re-distributed across 3 coords ──
log.Printf("[stress] phase 5: split (alpha surge spread across all 3 coords)")
startPhase("phase.split", 42, nil)
for i, d := range contracts[0].Demand {
coord := coords[i%len(coords)]
c := &contracts[0]
q := buildQuery(c, d, 2)
resp := tracedSearch("matrix.search.split", q, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("split", 42, coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
// ── Phase 6: non-determinism check ─────────────────────────
// Reissue each baseline query once and compare top-K Jaccard.
log.Printf("[stress] phase 6: non-determinism (reissue baselines, measure Jaccard)")
startPhase("phase.reissue", 48, nil)
jaccards := []float64{}
for _, ev := range output.Events {
if ev.Phase != "baseline" {
continue
}
resp := tracedSearch("matrix.search.reissue", ev.Query, corpora, false, "")
reissue := captureEvent("reissue", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp)
output.Events = append(output.Events, reissue)
// Compare against ev.TopK (also playbook-on baseline). Note:
// this conflates retrieval stability with playbook stability.
// We capture both ev (playbook on) and a fresh retrieval (off);
// real determinism = retrieval-only top-K comparison.
freshRetrievalResp := tracedSearch("matrix.search.reissue_retrieval_only", ev.Query, corpora, false, "")
freshRetrievalEv := captureEvent("reissue-retrieval-only", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp)
j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK)
jaccards = append(jaccards, j)
}
output.Determinism.NumReissuedPairs = len(jaccards)
output.Determinism.MeanJaccard = mean(jaccards)
// ── Phase 7: diversity analysis ─────────────────────────────
log.Printf("[stress] phase 7: diversity analysis")
output.Diversity = computeDiversity(output.Events)
// ── write ───────────────────────────────────────────────────
if err := os.MkdirAll(filepath.Dir(*out), 0o755); err != nil {
log.Fatalf("mkdir: %v", err)
}
bs, _ := json.MarshalIndent(output, "", " ")
if err := os.WriteFile(*out, bs, 0o644); err != nil {
log.Fatalf("write %s: %v", *out, err)
}
log.Printf("[stress] DONE — events=%d", len(output.Events))
log.Printf("[stress] diversity: same-role-across-contracts mean Jaccard = %.3f (n=%d)",
output.Diversity.SameRoleAcrossContractsMeanJaccard, output.Diversity.NumPairsSameRoleAcrossContracts)
log.Printf("[stress] different-roles-same-contract mean Jaccard = %.3f (n=%d)",
output.Diversity.DifferentRolesSameContractMeanJaccard, output.Diversity.NumPairsDifferentRolesSameContract)
log.Printf("[stress] determinism: mean Jaccard on reissue = %.3f (n=%d)",
output.Determinism.MeanJaccard, output.Determinism.NumReissuedPairs)
log.Printf("[stress] learning verbatim: handover hit rate (top-1) = %d/%d = %.0f%%",
output.Learning.RecordedAnswersTop1Count, output.Learning.HandoverQueriesRun,
output.Learning.HandoverHitRate*100)
if output.Learning.ParaphraseHandoverRun > 0 {
log.Printf("[stress] learning paraphrase: handover hit rate (top-1) = %d/%d = %.0f%% (top-K = %d/%d)",
output.Learning.ParaphraseTop1Count, output.Learning.ParaphraseHandoverRun,
output.Learning.ParaphraseHandoverHitRate*100,
output.Learning.ParaphraseTopKCount, output.Learning.ParaphraseHandoverRun)
}
log.Printf("[stress] results → %s", *out)
}
// generateParaphrase asks the judge model to rephrase a staffing query
// while preserving intent — same prompt template as
// scripts/playbook_lift/main.go, kept here as a copy to avoid a shared
// internal package for two scripts. If callers ever need a third
// paraphraser, lift this into internal/paraphrase/.
func generateParaphrase(hc *http.Client, ollamaURL, model, query string) (string, error) {
system := `You rephrase staffing queries while preserving intent.
Output JSON only: {"paraphrase": "<rephrased query>"}.
Rules:
- Keep the same role, certifications, geography, and constraints.
- Vary the wording (synonyms, reordered clauses, different sentence shape).
- Do NOT add or remove requirements.
- Do NOT explain — just emit the JSON.`
body, _ := json.Marshal(map[string]any{
"model": model,
"stream": false,
"format": "json",
"messages": []map[string]string{
{"role": "system", "content": system},
{"role": "user", "content": query},
},
"options": map[string]any{"temperature": 0.5},
})
req, _ := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("ollama chat: HTTP %d", resp.StatusCode)
}
rb, _ := io.ReadAll(resp.Body)
var ollamaResp struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(rb, &ollamaResp); err != nil {
return "", err
}
var out struct {
Paraphrase string `json:"paraphrase"`
}
if err := json.Unmarshal([]byte(ollamaResp.Message.Content), &out); err != nil {
return "", fmt.Errorf("decode paraphrase: %w (content=%q)", err, ollamaResp.Message.Content)
}
if strings.TrimSpace(out.Paraphrase) == "" {
return "", fmt.Errorf("empty paraphrase (content=%q)", ollamaResp.Message.Content)
}
return out.Paraphrase, nil
}
// ── helpers ──────────────────────────────────────────────────────
func loadContracts(dir string) ([]Contract, error) {
files, err := filepath.Glob(filepath.Join(dir, "contract_*.json"))
if err != nil {
return nil, err
}
if len(files) == 0 {
return nil, fmt.Errorf("no contract_*.json files in %s", dir)
}
var out []Contract
for _, f := range files {
bs, err := os.ReadFile(f)
if err != nil {
return nil, err
}
var c Contract
if err := json.Unmarshal(bs, &c); err != nil {
return nil, fmt.Errorf("%s: %w", f, err)
}
out = append(out, c)
}
return out, nil
}
func buildQuery(c *Contract, d Demand, surge int) string {
var b strings.Builder
if surge > 1 {
b.WriteString(fmt.Sprintf("URGENT: need %d ", d.Count*surge))
} else {
b.WriteString(fmt.Sprintf("Need %d ", d.Count))
}
b.WriteString(d.Role)
if c.Location != "" {
b.WriteString(" for ")
b.WriteString(c.Location)
}
if c.Shift != "" {
b.WriteString(", ")
b.WriteString(c.Shift)
b.WriteString(" shift")
}
if len(d.Certs) > 0 {
b.WriteString(", certifications: ")
b.WriteString(strings.Join(d.Certs, ", "))
}
if len(d.Skills) > 0 {
b.WriteString(", skills: ")
b.WriteString(strings.Join(d.Skills, ", "))
}
return b.String()
}
func captureEvent(phase string, hour int, coord, contract, role, query string, surge int, usePlaybook bool, pbCorpus string, resp *matrixResp) Event {
topK := make([]ResultRef, 0, len(resp.Results))
for i, r := range resp.Results {
topK = append(topK, ResultRef{Rank: i, ID: r.ID, Corpus: r.Corpus, Distance: r.Distance})
}
return Event{
Phase: phase,
Hour: hour,
Coordinator: coord,
Contract: contract,
Role: role,
Query: query,
SurgeMultiplier: surge,
UsePlaybook: usePlaybook,
PlaybookCorpus: pbCorpus,
TopK: topK,
PerCorpusCounts: resp.PerCorpusCounts,
PlaybookBoosted: resp.PlaybookBoosted,
PlaybookInjected: resp.PlaybookInjected,
TimestampUnixNano: time.Now().UnixNano(),
}
}
func computeDiversity(events []Event) Diversity {
// Filter to baseline events for clean apples-to-apples.
type key struct{ contract, role string }
byKey := map[key][]string{}
for _, ev := range events {
if ev.Phase != "baseline" {
continue
}
k := key{ev.Contract, ev.Role}
ids := make([]string, len(ev.TopK))
for i, r := range ev.TopK {
ids[i] = r.ID
}
byKey[k] = ids
}
// Same role across contracts: same `role`, different `contract`.
rolesSeen := map[string][][]string{}
contractsSeen := map[string][]struct {
role string
ids []string
}{}
for k, ids := range byKey {
rolesSeen[k.role] = append(rolesSeen[k.role], ids)
contractsSeen[k.contract] = append(contractsSeen[k.contract], struct {
role string
ids []string
}{k.role, ids})
}
var (
sameRoleJacc []float64
diffRolesJacc []float64
)
// Same-role-across-contracts: each role's idsSet pair-wise.
for _, idsList := range rolesSeen {
for i := 0; i < len(idsList); i++ {
for j := i + 1; j < len(idsList); j++ {
sameRoleJacc = append(sameRoleJacc, jaccardStrings(idsList[i], idsList[j]))
}
}
}
// Different-roles-same-contract.
for _, items := range contractsSeen {
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i].role == items[j].role {
continue
}
diffRolesJacc = append(diffRolesJacc, jaccardStrings(items[i].ids, items[j].ids))
}
}
}
return Diversity{
SameRoleAcrossContractsMeanJaccard: mean(sameRoleJacc),
DifferentRolesSameContractMeanJaccard: mean(diffRolesJacc),
NumPairsSameRoleAcrossContracts: len(sameRoleJacc),
NumPairsDifferentRolesSameContract: len(diffRolesJacc),
}
}
func jaccardTopK(a, b []ResultRef) float64 {
aIDs := make([]string, len(a))
bIDs := make([]string, len(b))
for i, r := range a {
aIDs[i] = r.ID
}
for i, r := range b {
bIDs[i] = r.ID
}
return jaccardStrings(aIDs, bIDs)
}
func jaccardStrings(a, b []string) float64 {
if len(a) == 0 && len(b) == 0 {
return 1.0
}
setA := map[string]bool{}
for _, x := range a {
setA[x] = true
}
intersect := 0
for _, x := range b {
if setA[x] {
intersect++
}
}
union := len(setA)
for _, x := range b {
if !setA[x] {
union++
}
}
if union == 0 {
return 0
}
return float64(intersect) / float64(union)
}
func mean(xs []float64) float64 {
if len(xs) == 0 {
return 0
}
s := 0.0
for _, x := range xs {
s += x
}
return s / float64(len(xs))
}
// ── HTTP helpers ─────────────────────────────────────────────────
func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string, excludeIDs ...string) (*matrixResp, error) {
body, _ := json.Marshal(matrixSearchReq{
QueryText: query,
Corpora: corpora,
K: k,
UsePlaybook: usePlaybook,
PlaybookCorpus: playbookCorpus,
ExcludeIDs: excludeIDs,
})
req, _ := http.NewRequest("POST", gw+"/v1/matrix/search", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("matrix.search %d: %s", resp.StatusCode, string(rb))
}
var out matrixResp
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
return &out, nil
}
// ensureFreshIndex creates the fresh_workers vectord index if it
// doesn't exist yet. Idempotent — re-creating returns 409 which we
// treat as "already there." Two-tier search pattern: fresh content
// goes to a small "hot" index and the search merges it with the
// main workers index. Solves the HNSW post-build add recall issue
// surfaced in runs #003-#005 (incremental adds to a 5K+ HNSW graph
// can land in poorly-connected regions and disappear from search;
// a small hot index has no such crowding).
func ensureFreshIndex(hc *http.Client, gw, indexName string, dim int) error {
body, _ := json.Marshal(map[string]any{
"name": indexName,
"dimension": dim,
"distance": "cosine",
})
req, _ := http.NewRequest("POST", gw+"/v1/vectors/index", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return fmt.Errorf("create index: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusConflict || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated {
return nil
}
rb, _ := io.ReadAll(resp.Body)
return fmt.Errorf("create index %d: %s", resp.StatusCode, string(rb))
}
// ingestFreshWorker embeds + adds a single fresh worker to the
// given vectord index. Two HTTP hops via the gateway: /v1/embed for
// the vector, /v1/vectors/index/<idx>/add to insert. The idx
// parameter exists so callers can target a separate hot index
// (fresh_workers) rather than the main 5K-item workers index, where
// HNSW post-build recall is unreliable.
func ingestFreshWorker(hc *http.Client, gw, idx, id, text string, metadata map[string]any) error {
embedBs, _ := json.Marshal(map[string]any{
"texts": []string{text},
"model": "nomic-embed-text-v2-moe",
})
req, _ := http.NewRequest("POST", gw+"/v1/embed", bytes.NewReader(embedBs))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return fmt.Errorf("embed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return fmt.Errorf("embed %d: %s", resp.StatusCode, string(rb))
}
var er struct {
Vectors [][]float32 `json:"vectors"`
}
if err := json.NewDecoder(resp.Body).Decode(&er); err != nil {
return fmt.Errorf("decode embed: %w", err)
}
if len(er.Vectors) == 0 || len(er.Vectors[0]) == 0 {
return fmt.Errorf("embed returned no vectors")
}
metaBs, _ := json.Marshal(metadata)
addBs, _ := json.Marshal(map[string]any{
"items": []map[string]any{
{"id": id, "vector": er.Vectors[0], "metadata": json.RawMessage(metaBs)},
},
})
req2, _ := http.NewRequest("POST", gw+"/v1/vectors/index/"+idx+"/add", bytes.NewReader(addBs))
req2.Header.Set("Content-Type", "application/json")
resp2, err := hc.Do(req2)
if err != nil {
return fmt.Errorf("vectord add: %w", err)
}
defer resp2.Body.Close()
if resp2.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp2.Body)
return fmt.Errorf("vectord add %d: %s", resp2.StatusCode, string(rb))
}
return nil
}
type langfuseEnvCreds struct {
URL string
PublicKey string
SecretKey string
}
// loadLangfuseEnv parses a key=value env file (one assignment per
// non-comment line) and pulls LANGFUSE_URL, LANGFUSE_PUBLIC_KEY,
// LANGFUSE_SECRET_KEY. All three required; missing any returns an
// error so callers can skip-with-warning rather than silently
// run without tracing.
func loadLangfuseEnv(path string) (*langfuseEnvCreds, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
creds := &langfuseEnvCreds{}
sc := bufio.NewScanner(f)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" || strings.HasPrefix(line, "#") {
continue
}
eq := strings.IndexByte(line, '=')
if eq < 0 {
continue
}
k, v := strings.TrimSpace(line[:eq]), strings.TrimSpace(line[eq+1:])
switch k {
case "LANGFUSE_URL":
creds.URL = v
case "LANGFUSE_PUBLIC_KEY":
creds.PublicKey = v
case "LANGFUSE_SECRET_KEY":
creds.SecretKey = v
}
}
if err := sc.Err(); err != nil {
return nil, err
}
if creds.URL == "" || creds.PublicKey == "" || creds.SecretKey == "" {
return nil, fmt.Errorf("langfuse env missing one of URL/PUBLIC_KEY/SECRET_KEY")
}
return creds, nil
}
func firstDistance(results []matrixResult) float32 {
if len(results) == 0 {
return 0
}
return results[0].Distance
}
// parsedDemand is the LLM-extracted structure from an inbox message
// body — what a real coordinator would type into a search form.
// Empty fields are honest: the body didn't say.
type parsedDemand struct {
Role string `json:"role"`
Count int `json:"count"`
Location string `json:"location"`
Certs []string `json:"certs"`
Skills []string `json:"skills"`
Shift string `json:"shift"`
}
// AsQuery composes a matrix.search query string from the parsed
// fields. Mirrors buildQuery's shape so search-time semantics match
// what the contract-driven phases produce. Empty fields are skipped
// rather than emitted as "" markers.
func (p parsedDemand) AsQuery() string {
var b strings.Builder
if p.Count > 0 {
fmt.Fprintf(&b, "Need %d ", p.Count)
} else {
b.WriteString("Need ")
}
b.WriteString(p.Role)
if p.Location != "" {
b.WriteString(" for ")
b.WriteString(p.Location)
}
if p.Shift != "" {
b.WriteString(", ")
b.WriteString(p.Shift)
b.WriteString(" shift")
}
if len(p.Certs) > 0 {
b.WriteString(", certifications: ")
b.WriteString(strings.Join(p.Certs, ", "))
}
if len(p.Skills) > 0 {
b.WriteString(", skills: ")
b.WriteString(strings.Join(p.Skills, ", "))
}
return b.String()
}
// judgeInboxResult rates the top retrieval against the ORIGINAL inbox
// body. Returns 1-5 (5 = perfect match, 1 = irrelevant); 0 on any
// error. Real product driver: a tight-distance result on a
// LLM-parsed query may still be wrong-domain (parser dropped a
// critical constraint, or the corpus genuinely has no match). The
// rating gives coordinators an honest "this is close in vector
// space but doesn't actually fit your ask" signal.
func judgeInboxResult(hc *http.Client, ollamaURL, model, inboxBody string, top matrixResult) int {
system := `You rate retrieval results for a staffing co-pilot.
Rate the result 1-5 against the original inbox request:
5 = perfect match (this person/role IS what was asked for)
4 = strong match (right field, right level, minor mismatches)
3 = adjacent match (related field or partial overlap)
2 = weak/tangential match
1 = irrelevant
Output JSON only: {"rating": N, "reason": "<one sentence>"}.`
user := fmt.Sprintf("Original inbox request:\n%s\n\nResult corpus: %s\nResult ID: %s\nResult metadata:\n%s",
inboxBody, top.Corpus, top.ID, string(top.Metadata))
body, _ := json.Marshal(map[string]any{
"model": model,
"stream": false,
"format": "json",
"messages": []map[string]string{
{"role": "system", "content": system},
{"role": "user", "content": user},
},
"options": map[string]any{"temperature": 0},
})
req, _ := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return 0
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return 0
}
rb, _ := io.ReadAll(resp.Body)
var ollamaResp struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(rb, &ollamaResp); err != nil {
return 0
}
var v struct {
Rating int `json:"rating"`
}
if err := json.Unmarshal([]byte(ollamaResp.Message.Content), &v); err != nil {
return 0
}
if v.Rating < 1 || v.Rating > 5 {
return 0
}
return v.Rating
}
// parseInboxDemand asks the judge model to extract structured fields
// from an inbox body. Same Ollama+JSON-format pattern as the
// generateParaphrase function. Real production would have a dedicated
// small model fine-tuned on staffing-language inbox parsing; here we
// use the same model that judges relevance. temperature=0 for
// deterministic extraction.
func parseInboxDemand(hc *http.Client, ollamaURL, model, inboxBody string) (*parsedDemand, error) {
system := `You parse staffing requests from email/SMS bodies. Extract structured fields.
Output JSON ONLY, this exact shape: {"role": "...", "count": N, "location": "...", "certs": [...], "skills": [...], "shift": "..."}.
Rules:
- role: the job role being requested (lowercase, e.g. "warehouse worker", "forklift operator")
- count: number of workers needed (integer; if "a few" or unspecified, use 1)
- location: city + state if both mentioned (e.g. "Cleveland, OH"); city only if state missing
- certs: certification list as named in the body (e.g. ["OSHA-30", "forklift cert"])
- skills: skill list as named in the body (e.g. ["pallet jack", "spanish"])
- shift: "day"|"swing"|"night" if mentioned, else ""
- If a field isn't in the body, use empty string or empty array (never null)
- Do NOT explain — emit the JSON only.`
body, _ := json.Marshal(map[string]any{
"model": model,
"stream": false,
"format": "json",
"messages": []map[string]string{
{"role": "system", "content": system},
{"role": "user", "content": inboxBody},
},
"options": map[string]any{"temperature": 0},
})
req, _ := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("ollama chat: HTTP %d", resp.StatusCode)
}
rb, _ := io.ReadAll(resp.Body)
var ollamaResp struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(rb, &ollamaResp); err != nil {
return nil, err
}
var out parsedDemand
if err := json.Unmarshal([]byte(ollamaResp.Message.Content), &out); err != nil {
return nil, fmt.Errorf("decode parsed demand: %w (content=%q)", err, ollamaResp.Message.Content)
}
if strings.TrimSpace(out.Role) == "" {
return nil, fmt.Errorf("parsed demand has empty role (content=%q)", ollamaResp.Message.Content)
}
return &out, nil
}
// postInbox sends an inbox message (email or SMS) to observerd via
// the gateway. observerd records it as an ObservedOp with
// Source=SourceInbox; downstream actions (search, ingest, etc.) are
// the caller's concern.
func postInbox(hc *http.Client, gw, msgType, sender, subject, body, priority, tag string) error {
bodyJSON, _ := json.Marshal(map[string]any{
"type": msgType,
"sender": sender,
"subject": subject,
"body": body,
"priority": priority,
"tag": tag,
})
req, _ := http.NewRequest("POST", gw+"/v1/observer/inbox", bytes.NewReader(bodyJSON))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return fmt.Errorf("inbox %d: %s", resp.StatusCode, string(rb))
}
return nil
}
// coordByName looks up a coordinator by name. Used by inbox-triggered
// searches that route based on the email's tagged coordinator.
func coordByName(coords []Coordinator, name string) Coordinator {
for _, c := range coords {
if c.Name == name {
return c
}
}
return coords[0]
}
func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error {
body, _ := json.Marshal(map[string]any{
"query_text": query,
"answer_id": answerID,
"answer_corpus": answerCorpus,
"score": score,
"tags": []string{"multi-coord-stress"},
"corpus": corpus,
})
req, _ := http.NewRequest("POST", gw+"/v1/matrix/playbooks/record", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return fmt.Errorf("playbook record %d: %s", resp.StatusCode, string(rb))
}
return nil
}
func must[T any](v T, err error) T {
if err != nil {
log.Fatalf("[stress] %v", err)
}
return v
}