J asked for a much more sophisticated test using the 100k corpus from
the Rust legacy database. This commit ships:
scripts/cutover/multitier/main.go — 6-scenario harness with weighted
random selection per goroutine. Mixes search, email/SMS/fill
validators (in-process via internal/validator), profile swap with
ExcludeIDs, repeat-cache exercise, and playbook record/replay.
Scenarios + weights (cumulative scenario fractions):
35% cold_search_email — search + email outreach + EmailValidator
15% surge_fill_validate — search + fill proposal + FillValidator + record
15% profile_swap — original search + ExcludeIDs swap + no-overlap check
15% repeat_cache — same query × 5 (cache effectiveness)
10% sms_validate — SMS draft (≤160 chars, phone for SSN-FP guard)
10% playbook_record_replay — cold → record → warm w/ use_playbook=true
Test results (5-min sustained, conc=50, 100k workers indexed):
TOTAL 335,257 scenarios @ 1,115/sec
cold_search_email 117k @ 0.0% fail · p50 2.2ms · p99 8.6ms
surge_fill_validate 50k @ 98.8% fail (substrate bug below)
profile_swap 50k @ 0.0% fail · p50 4.5ms · ExcludeIDs verified
repeat_cache 50k × 5 = 252k searches @ 0.0% fail · p50 11.7ms
sms_validate 33k @ 0.0% fail · phone-pattern guard works
playbook_record_replay 33k @ 96.8% fail (substrate bug below)
Total successful workflows: ~250k+
Validator integration verified at load:
150,930 EmailValidator passes across cold_search_email + sms_validate
35 + 1,061 successful FillValidator + playbook_record (where the bug
didn't fire)
zero false positives on the SSN-pattern guard against phone numbers
Resource footprint at 100k:
vectord 1.23GB RSS (linear with 100k vectors)
matrixd 26MB, 75% CPU (1-core saturated at conc=50)
Total across 11 daemons: 1.7GB
Compare to Rust at 14.9GB — ~10× less even at 100k.
SUBSTRATE BUG SURFACED: coder/hnsw v0.6.1 nil-deref in
layerNode.search at graph.go:95. Triggers on /v1/matrix/playbooks/record
under sustained writes to the small playbook_memory index. Both Add
and Search paths can panic.
Workaround applied (this commit) in internal/vectord/index.go
BatchAdd: recover() guard converts panic to error; daemon stays up
instead of crashing the request handler.
Operator recovery procedure (also documented in the report):
curl -X DELETE http://localhost:4215/vectors/index/playbook_memory
Next record recreates the index fresh.
Real fix DEFERRED — open in docs/ARCHITECTURE_COMPARISON.md
Decisions tracker. Three options:
a) upstream patch to coder/hnsw
b) custom small-index Add path that always rebuilds when len < threshold
c) alternate store for playbook_memory (Lance? in-memory map?)
Evidence: reports/cutover/multitier_100k.md (full methodology +
results + repro + bug analysis). docs/ARCHITECTURE_COMPARISON.md
Decisions tracker updated.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
854 lines
26 KiB
Go
854 lines
26 KiB
Go
// multitier — sophisticated multi-scenario load test mixing search,
|
||
// email/SMS validation, fill validation, profile swap, and playbook
|
||
// recording. Exercises the full Go substrate end-to-end at concurrency
|
||
// against a large workers corpus.
|
||
//
|
||
// Six scenario types (each ~simulates one coordinator workflow):
|
||
// A. cold_search_email — fresh demand → search → email outreach + validate
|
||
// B. surge_fill_validate — multi-role surge → search × N → fill proposal + validate + record
|
||
// C. profile_swap — initial search → mid-session swap with ExcludeIDs
|
||
// D. repeat_cache — same query × 5 → measure warm-cache RPS pattern
|
||
// E. sms_validate — search → SMS draft (≤160 chars) → validate
|
||
// F. playbook_record_replay — search → record → re-search with use_playbook=true
|
||
//
|
||
// Validators run IN-PROCESS via internal/validator (FillValidator +
|
||
// EmailValidator). Search/record go through Go gateway via HTTP.
|
||
//
|
||
// Usage:
|
||
// multitier -gateway http://127.0.0.1:4110 \
|
||
// -concurrency 50 -duration 5m
|
||
//
|
||
// Output: per-scenario success/failure counts, end-to-end latency
|
||
// histogram, validator pass/fail breakdown.
|
||
package main
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"math/rand"
|
||
"net/http"
|
||
"os"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/validator"
|
||
)
|
||
|
||
// ── Scenario harness ──────────────────────────────────────────────
|
||
|
||
type scenarioResult struct {
|
||
name string
|
||
success bool
|
||
failedAt string // step that broke the workflow
|
||
latency time.Duration
|
||
subSteps []stepResult // per-step breakdown
|
||
validator string // "pass", "fail-policy", "fail-consistency", etc.
|
||
}
|
||
|
||
type stepResult struct {
|
||
step string
|
||
latency time.Duration
|
||
ok bool
|
||
err string
|
||
}
|
||
|
||
type harness struct {
|
||
gateway string
|
||
hc *http.Client
|
||
corpus string // index name
|
||
queryPool []map[string]any // generated queries
|
||
workerPool []validator.WorkerRecord // pretend roster for validators
|
||
emailVal *validator.EmailValidator
|
||
fillVal *validator.FillValidator
|
||
|
||
// Counters (atomic)
|
||
scenarioRuns map[string]*int64
|
||
scenarioFail map[string]*int64
|
||
stepFail map[string]*int64
|
||
fillDebugCount int64
|
||
}
|
||
|
||
func newHarness(gateway string, corpus string) *harness {
|
||
queryPool := buildQueryPool()
|
||
workerPool := buildWorkerPool()
|
||
lookup := validator.NewInMemoryWorkerLookup(workerPool)
|
||
scenarios := []string{"cold_search_email", "surge_fill_validate", "profile_swap", "repeat_cache", "sms_validate", "playbook_record_replay"}
|
||
runs := map[string]*int64{}
|
||
fails := map[string]*int64{}
|
||
for _, s := range scenarios {
|
||
r := int64(0)
|
||
f := int64(0)
|
||
runs[s] = &r
|
||
fails[s] = &f
|
||
}
|
||
return &harness{
|
||
gateway: gateway,
|
||
hc: &http.Client{Timeout: 60 * time.Second},
|
||
corpus: corpus,
|
||
queryPool: queryPool,
|
||
workerPool: workerPool,
|
||
emailVal: validator.NewEmailValidator(lookup),
|
||
fillVal: validator.NewFillValidator(lookup),
|
||
scenarioRuns: runs,
|
||
scenarioFail: fails,
|
||
stepFail: map[string]*int64{},
|
||
}
|
||
}
|
||
|
||
// buildQueryPool generates a mix of staffing-shape queries across
|
||
// roles, geos, clients. 30 queries to keep cache pressure realistic.
|
||
func buildQueryPool() []map[string]any {
|
||
roles := []string{"Forklift Operator", "Pickers", "Loaders", "Warehouse Associate", "CNC Operator", "Shipping Clerk", "Packers"}
|
||
cities := []struct {
|
||
city, state string
|
||
}{
|
||
{"Detroit", "MI"}, {"Indianapolis", "IN"}, {"Aurora", "IL"},
|
||
{"Joliet", "IL"}, {"Flint", "MI"}, {"Cleveland", "OH"},
|
||
{"Fort Wayne", "IN"}, {"Kansas City", "MO"},
|
||
}
|
||
clients := []string{"Beacon Freight", "Midway Distribution", "Parallel Machining", "Heritage Foods", "Cornerstone Fabrication", "Pioneer Assembly"}
|
||
out := []map[string]any{}
|
||
for i := 0; i < 30; i++ {
|
||
role := roles[i%len(roles)]
|
||
c := cities[i%len(cities)]
|
||
client := clients[i%len(clients)]
|
||
count := 1 + (i % 5)
|
||
out = append(out, map[string]any{
|
||
"text": fmt.Sprintf("Need %d %s in %s %s for %s", count, role, c.city, c.state, client),
|
||
"role": role,
|
||
"city": c.city,
|
||
"state": c.state,
|
||
"client": client,
|
||
"count": count,
|
||
})
|
||
}
|
||
return out
|
||
}
|
||
|
||
// buildWorkerPool fabricates a small in-memory roster for validators.
|
||
// Real workers come from the parquet ingest; the validator's
|
||
// WorkerLookup needs a separate in-process source. Cross-runtime
|
||
// audit: the IDs used here (test-w-XXX) won't collide with the
|
||
// parquet's "w-XXX" prefix.
|
||
func buildWorkerPool() []validator.WorkerRecord {
|
||
roles := []string{"Forklift Operator", "Pickers", "Loaders", "Warehouse Associate", "CNC Operator", "Shipping Clerk", "Packers"}
|
||
cities := []struct{ city, state string }{
|
||
{"Detroit", "MI"}, {"Indianapolis", "IN"}, {"Aurora", "IL"},
|
||
{"Joliet", "IL"}, {"Flint", "MI"},
|
||
}
|
||
out := make([]validator.WorkerRecord, 0, 200)
|
||
for i := 0; i < 200; i++ {
|
||
role := roles[i%len(roles)]
|
||
c := cities[i%len(cities)]
|
||
out = append(out, validator.WorkerRecord{
|
||
CandidateID: fmt.Sprintf("test-w-%03d", i),
|
||
Name: fmt.Sprintf("Worker %03d", i),
|
||
Status: "active",
|
||
City: ptr(c.city),
|
||
State: ptr(c.state),
|
||
Role: ptr(role),
|
||
})
|
||
}
|
||
return out
|
||
}
|
||
|
||
func ptr(s string) *string { return &s }
|
||
|
||
// ── HTTP helpers ──────────────────────────────────────────────────
|
||
|
||
type matrixSearchReq struct {
|
||
QueryText string `json:"query_text"`
|
||
QueryRole string `json:"query_role,omitempty"`
|
||
Corpora []string `json:"corpora"`
|
||
K int `json:"k"`
|
||
PerCorpusK int `json:"per_corpus_k"`
|
||
UsePlaybook bool `json:"use_playbook"`
|
||
ExcludeIDs []string `json:"exclude_ids,omitempty"`
|
||
}
|
||
|
||
type matrixResult struct {
|
||
ID string `json:"id"`
|
||
Distance float32 `json:"distance"`
|
||
Corpus string `json:"corpus"`
|
||
Metadata json.RawMessage `json:"metadata,omitempty"`
|
||
}
|
||
|
||
type matrixResp struct {
|
||
Results []matrixResult `json:"results"`
|
||
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
|
||
PlaybookInjected int `json:"playbook_injected,omitempty"`
|
||
}
|
||
|
||
func (h *harness) matrixSearch(req matrixSearchReq) (*matrixResp, error) {
|
||
body, _ := json.Marshal(req)
|
||
httpReq, _ := http.NewRequest("POST", h.gateway+"/v1/matrix/search", bytes.NewReader(body))
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
resp, err := h.hc.Do(httpReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode != 200 {
|
||
bs, _ := io.ReadAll(resp.Body)
|
||
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(bs))
|
||
}
|
||
var out matrixResp
|
||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
func (h *harness) playbookRecord(query, role, answerID, corpus string, score float64) error {
|
||
body, _ := json.Marshal(map[string]any{
|
||
"query_text": query,
|
||
"role": role,
|
||
"answer_id": answerID,
|
||
"answer_corpus": corpus,
|
||
"score": score,
|
||
"tags": []string{"multitier-loadtest"},
|
||
})
|
||
httpReq, _ := http.NewRequest("POST", h.gateway+"/v1/matrix/playbooks/record", bytes.NewReader(body))
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
resp, err := h.hc.Do(httpReq)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode != 200 {
|
||
bs, _ := io.ReadAll(resp.Body)
|
||
return fmt.Errorf("status %d: %s", resp.StatusCode, string(bs))
|
||
}
|
||
io.Copy(io.Discard, resp.Body)
|
||
return nil
|
||
}
|
||
|
||
// ── Scenarios ─────────────────────────────────────────────────────
|
||
|
||
// timeStep wraps a closure with timing + step result.
|
||
func timeStep(name string, f func() error) stepResult {
|
||
start := time.Now()
|
||
err := f()
|
||
r := stepResult{step: name, latency: time.Since(start), ok: err == nil}
|
||
if err != nil {
|
||
r.err = err.Error()
|
||
}
|
||
return r
|
||
}
|
||
|
||
// Scenario A: cold_search_email — fresh demand → search → email outreach + validate
|
||
func (h *harness) scenarioColdSearchEmail() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
queryText := q["text"].(string)
|
||
role := q["role"].(string)
|
||
r := scenarioResult{name: "cold_search_email"}
|
||
start := time.Now()
|
||
|
||
// Step 1: search
|
||
var resp *matrixResp
|
||
r.subSteps = append(r.subSteps, timeStep("search", func() error {
|
||
var err error
|
||
resp, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: queryText, QueryRole: role,
|
||
Corpora: []string{h.corpus}, K: 5, PerCorpusK: 5,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[0].ok || resp == nil || len(resp.Results) == 0 {
|
||
r.failedAt = "search"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 2: pick top candidate, simulate email draft using a known
|
||
// worker from our local pool (validator wants in-pool IDs).
|
||
worker := h.workerPool[rand.Intn(len(h.workerPool))]
|
||
emailDraft := map[string]any{
|
||
"to": "client@example.com",
|
||
"subject": fmt.Sprintf("Worker available: %s", worker.Name),
|
||
"body": fmt.Sprintf("Hi, %s is available for the %s role tomorrow. Confirm?", worker.Name, role),
|
||
"_context": map[string]any{"candidate_id": worker.CandidateID},
|
||
}
|
||
|
||
// Step 3: validate
|
||
r.subSteps = append(r.subSteps, timeStep("email_validate", func() error {
|
||
_, err := h.emailVal.Validate(validator.Artifact{EmailDraft: emailDraft})
|
||
return err
|
||
}))
|
||
if !r.subSteps[1].ok {
|
||
r.failedAt = "email_validate"
|
||
r.validator = "fail"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
r.success = true
|
||
r.validator = "pass"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Scenario B: surge_fill_validate — multi-role contract → multi-search → fill proposal → validate → record
|
||
func (h *harness) scenarioSurgeFillValidate() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
role := q["role"].(string)
|
||
city := q["city"].(string)
|
||
state := q["state"].(string)
|
||
r := scenarioResult{name: "surge_fill_validate"}
|
||
start := time.Now()
|
||
|
||
// Step 1: search × 1 (kept narrow to keep total latency reasonable)
|
||
var resp *matrixResp
|
||
r.subSteps = append(r.subSteps, timeStep("search", func() error {
|
||
var err error
|
||
resp, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: q["text"].(string), QueryRole: role,
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[0].ok || resp == nil || len(resp.Results) == 0 {
|
||
r.failedAt = "search"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 2: build fill proposal — pick 2 workers that share city/state/role
|
||
// to avoid spurious geo-mismatch errors. We loop the local pool from a
|
||
// random offset and pick the first two with matching attributes.
|
||
w1 := h.workerPool[rand.Intn(len(h.workerPool))]
|
||
var w2 *validator.WorkerRecord
|
||
// Linear scan from a random start offset — guarantees full pool
|
||
// coverage even on small pools where matching pairs are rare.
|
||
startOffset := rand.Intn(len(h.workerPool))
|
||
for j := 0; j < len(h.workerPool); j++ {
|
||
idx := (startOffset + j) % len(h.workerPool)
|
||
w := h.workerPool[idx]
|
||
if w.CandidateID == w1.CandidateID {
|
||
continue
|
||
}
|
||
if w.City != nil && w1.City != nil && *w.City == *w1.City &&
|
||
w.State != nil && w1.State != nil && *w.State == *w1.State &&
|
||
w.Role != nil && w1.Role != nil && *w.Role == *w1.Role {
|
||
wCopy := w // ensure address stays valid past loop end
|
||
w2 = &wCopy
|
||
break
|
||
}
|
||
}
|
||
if w2 == nil {
|
||
// Pool too sparse — should not happen with the 200-worker
|
||
// fixture pool (35 unique combos × ~5 workers each). If we
|
||
// hit this branch, the pool generator drifted; failing the
|
||
// scenario is the right signal.
|
||
r.failedAt = "no_matching_worker_pair"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
city = *w1.City
|
||
state = *w1.State
|
||
role = *w1.Role
|
||
fillProposal := map[string]any{
|
||
"_context": map[string]any{
|
||
"target_count": float64(2),
|
||
"city": city, "state": state, "role": role,
|
||
},
|
||
"fills": []any{
|
||
map[string]any{"candidate_id": w1.CandidateID, "name": w1.Name},
|
||
map[string]any{"candidate_id": w2.CandidateID, "name": w2.Name},
|
||
},
|
||
}
|
||
|
||
// Step 3: validate fill
|
||
var fillErr error
|
||
r.subSteps = append(r.subSteps, timeStep("fill_validate", func() error {
|
||
_, err := h.fillVal.Validate(validator.Artifact{FillProposal: fillProposal})
|
||
fillErr = err
|
||
return err
|
||
}))
|
||
if !r.subSteps[1].ok {
|
||
// Print first error to stderr for diagnosis (rate-limited)
|
||
if atomic.AddInt64(&h.fillDebugCount, 1) <= 3 && fillErr != nil {
|
||
fmt.Fprintf(os.Stderr, "[debug] fill_validate err: %v\n proposal=%+v\n", fillErr, fillProposal)
|
||
}
|
||
r.failedAt = "fill_validate"
|
||
r.validator = "fail"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 4: record playbook on first result
|
||
r.subSteps = append(r.subSteps, timeStep("playbook_record", func() error {
|
||
return h.playbookRecord(q["text"].(string), role, resp.Results[0].ID, h.corpus, 1.0)
|
||
}))
|
||
if !r.subSteps[2].ok {
|
||
r.failedAt = "playbook_record"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
r.success = true
|
||
r.validator = "pass"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Scenario C: profile_swap — initial search → mid-session swap with ExcludeIDs
|
||
func (h *harness) scenarioProfileSwap() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
r := scenarioResult{name: "profile_swap"}
|
||
start := time.Now()
|
||
|
||
// Step 1: original search
|
||
var resp1 *matrixResp
|
||
r.subSteps = append(r.subSteps, timeStep("search_original", func() error {
|
||
var err error
|
||
resp1, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: q["text"].(string), QueryRole: q["role"].(string),
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[0].ok || resp1 == nil || len(resp1.Results) == 0 {
|
||
r.failedAt = "search_original"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Capture placed IDs
|
||
excludeIDs := make([]string, 0, len(resp1.Results))
|
||
for _, x := range resp1.Results {
|
||
excludeIDs = append(excludeIDs, x.ID)
|
||
}
|
||
|
||
// Step 2: swap search excluding the original placements
|
||
var resp2 *matrixResp
|
||
r.subSteps = append(r.subSteps, timeStep("search_swap", func() error {
|
||
var err error
|
||
resp2, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: q["text"].(string), QueryRole: q["role"].(string),
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
ExcludeIDs: excludeIDs,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[1].ok || resp2 == nil {
|
||
r.failedAt = "search_swap"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Verify no overlap between original + swap
|
||
overlap := false
|
||
for _, a := range resp1.Results {
|
||
for _, b := range resp2.Results {
|
||
if a.ID == b.ID {
|
||
overlap = true
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if overlap {
|
||
r.failedAt = "swap_overlap_detected"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
r.success = true
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Scenario D: repeat_cache — same query × 5 → measures cache effectiveness
|
||
func (h *harness) scenarioRepeatCache() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
r := scenarioResult{name: "repeat_cache"}
|
||
start := time.Now()
|
||
|
||
for i := 0; i < 5; i++ {
|
||
stepName := fmt.Sprintf("search_%d", i)
|
||
var resp *matrixResp
|
||
step := timeStep(stepName, func() error {
|
||
var err error
|
||
resp, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: q["text"].(string), QueryRole: q["role"].(string),
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
})
|
||
return err
|
||
})
|
||
r.subSteps = append(r.subSteps, step)
|
||
if !step.ok || resp == nil {
|
||
r.failedAt = stepName
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
}
|
||
|
||
r.success = true
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Scenario E: sms_validate — SMS-shaped draft (≤160 chars), validate via
|
||
// EmailValidator with kind=sms. Catches the SMS-length cap + the
|
||
// SSN-pattern false-positive on phone numbers.
|
||
func (h *harness) scenarioSMSValidate() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
worker := h.workerPool[rand.Intn(len(h.workerPool))]
|
||
r := scenarioResult{name: "sms_validate"}
|
||
start := time.Now()
|
||
|
||
// Step 1: search
|
||
r.subSteps = append(r.subSteps, timeStep("search", func() error {
|
||
_, err := h.matrixSearch(matrixSearchReq{
|
||
QueryText: q["text"].(string), QueryRole: q["role"].(string),
|
||
Corpora: []string{h.corpus}, K: 1, PerCorpusK: 1,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[0].ok {
|
||
r.failedAt = "search"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 2: SMS draft (deliberately includes a phone number to
|
||
// stress-test the SSN-pattern false-positive guard)
|
||
smsDraft := map[string]any{
|
||
"to": "+15551234567",
|
||
"body": fmt.Sprintf("%s confirmed for %s. Reply STOP to cancel. Call 555-123-4567.", worker.Name, q["role"].(string)),
|
||
"kind": "sms",
|
||
"_context": map[string]any{"candidate_id": worker.CandidateID},
|
||
}
|
||
|
||
// Step 3: validate
|
||
r.subSteps = append(r.subSteps, timeStep("sms_validate", func() error {
|
||
_, err := h.emailVal.Validate(validator.Artifact{EmailDraft: smsDraft})
|
||
return err
|
||
}))
|
||
if !r.subSteps[1].ok {
|
||
r.failedAt = "sms_validate"
|
||
r.validator = "fail"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
r.success = true
|
||
r.validator = "pass"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Scenario F: playbook_record_replay — search → record → re-search with
|
||
// use_playbook=true. Verifies the learning loop fires through real HTTP.
|
||
//
|
||
// Adds a unique suffix to query_text so concurrent goroutines don't
|
||
// race on the same playbook_memory entry (vectord HNSW Add has a
|
||
// concurrency edge case on small indexes; uniqueness avoids it).
|
||
func (h *harness) scenarioPlaybookRecordReplay() scenarioResult {
|
||
q := h.queryPool[rand.Intn(len(h.queryPool))]
|
||
role := q["role"].(string)
|
||
queryText := fmt.Sprintf("%s [run-%d-%d]",
|
||
q["text"].(string), time.Now().UnixNano(), rand.Intn(1000000))
|
||
r := scenarioResult{name: "playbook_record_replay"}
|
||
start := time.Now()
|
||
|
||
// Step 1: cold search
|
||
var resp1 *matrixResp
|
||
r.subSteps = append(r.subSteps, timeStep("cold_search", func() error {
|
||
var err error
|
||
resp1, err = h.matrixSearch(matrixSearchReq{
|
||
QueryText: queryText, QueryRole: role,
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[0].ok || resp1 == nil || len(resp1.Results) == 0 {
|
||
r.failedAt = "cold_search"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 2: record top-1 as playbook
|
||
topID := resp1.Results[0].ID
|
||
r.subSteps = append(r.subSteps, timeStep("playbook_record", func() error {
|
||
return h.playbookRecord(queryText, role, topID, h.corpus, 1.0)
|
||
}))
|
||
if !r.subSteps[1].ok {
|
||
r.failedAt = "playbook_record"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// Step 3: warm search with use_playbook=true
|
||
r.subSteps = append(r.subSteps, timeStep("warm_search", func() error {
|
||
_, err := h.matrixSearch(matrixSearchReq{
|
||
QueryText: queryText, QueryRole: role,
|
||
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
|
||
UsePlaybook: true,
|
||
})
|
||
return err
|
||
}))
|
||
if !r.subSteps[2].ok {
|
||
r.failedAt = "warm_search"
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
r.success = true
|
||
r.latency = time.Since(start)
|
||
return r
|
||
}
|
||
|
||
// pickScenario rolls a scenario based on a realistic workflow mix.
|
||
func (h *harness) pickScenario() func() scenarioResult {
|
||
roll := rand.Intn(100)
|
||
switch {
|
||
case roll < 35: // 35% — fresh demand searches
|
||
return h.scenarioColdSearchEmail
|
||
case roll < 50: // 15% — surge fills with validation
|
||
return h.scenarioSurgeFillValidate
|
||
case roll < 65: // 15% — profile swaps
|
||
return h.scenarioProfileSwap
|
||
case roll < 80: // 15% — repeat queries (cache)
|
||
return h.scenarioRepeatCache
|
||
case roll < 90: // 10% — SMS drafts
|
||
return h.scenarioSMSValidate
|
||
default: // 10% — playbook record + replay
|
||
return h.scenarioPlaybookRecordReplay
|
||
}
|
||
}
|
||
|
||
// ── Reporter ──────────────────────────────────────────────────────
|
||
|
||
type latStats struct {
|
||
count int
|
||
sum time.Duration
|
||
latencies []time.Duration
|
||
}
|
||
|
||
func (s *latStats) add(d time.Duration) {
|
||
s.count++
|
||
s.sum += d
|
||
s.latencies = append(s.latencies, d)
|
||
}
|
||
func (s *latStats) percentile(p float64) time.Duration {
|
||
if len(s.latencies) == 0 {
|
||
return 0
|
||
}
|
||
sort.Slice(s.latencies, func(i, j int) bool { return s.latencies[i] < s.latencies[j] })
|
||
idx := int(p * float64(len(s.latencies)))
|
||
if idx >= len(s.latencies) {
|
||
idx = len(s.latencies) - 1
|
||
}
|
||
return s.latencies[idx]
|
||
}
|
||
|
||
// ── Main ──────────────────────────────────────────────────────────
|
||
|
||
func main() {
|
||
gateway := flag.String("gateway", "http://127.0.0.1:4110", "Go gateway base URL")
|
||
conc := flag.Int("concurrency", 50, "concurrent workers")
|
||
dur := flag.Duration("duration", 5*time.Minute, "load duration")
|
||
corpus := flag.String("corpus", "workers", "vectord index name")
|
||
flag.Parse()
|
||
|
||
rand.Seed(time.Now().UnixNano())
|
||
h := newHarness(*gateway, *corpus)
|
||
|
||
// Sanity: hit /v1/matrix/search once to verify the corpus is up.
|
||
if _, err := h.matrixSearch(matrixSearchReq{
|
||
QueryText: "test", Corpora: []string{*corpus}, K: 1, PerCorpusK: 1,
|
||
}); err != nil {
|
||
log.Fatalf("[multitier] sanity probe failed: %v\n is the persistent stack up + workers ingested?", err)
|
||
}
|
||
|
||
results := make(chan scenarioResult, *conc*2)
|
||
stop := make(chan struct{})
|
||
var wg sync.WaitGroup
|
||
var totalRuns int64
|
||
|
||
for w := 0; w < *conc; w++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for {
|
||
select {
|
||
case <-stop:
|
||
return
|
||
default:
|
||
}
|
||
fn := h.pickScenario()
|
||
res := fn()
|
||
results <- res
|
||
atomic.AddInt64(&totalRuns, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Reporter goroutine.
|
||
all := []scenarioResult{}
|
||
doneCollect := make(chan struct{})
|
||
go func() {
|
||
for r := range results {
|
||
all = append(all, r)
|
||
}
|
||
close(doneCollect)
|
||
}()
|
||
|
||
log.Printf("[multitier] gateway=%s · concurrency=%d · duration=%v · corpus=%s",
|
||
*gateway, *conc, *dur, *corpus)
|
||
log.Printf("[multitier] scenarios: cold_search_email (35%%) · surge_fill (15%%) · swap (15%%) · repeat (15%%) · sms (10%%) · record_replay (10%%)")
|
||
startWall := time.Now()
|
||
time.Sleep(*dur)
|
||
close(stop)
|
||
wg.Wait()
|
||
close(results)
|
||
<-doneCollect
|
||
wallElapsed := time.Since(startWall)
|
||
|
||
report(all, wallElapsed, *conc)
|
||
}
|
||
|
||
func report(all []scenarioResult, wall time.Duration, concurrency int) {
|
||
fmt.Printf("\n══ multitier load report ══\n")
|
||
fmt.Printf("wall: %v · concurrency=%d\n", wall.Round(time.Millisecond), concurrency)
|
||
fmt.Printf("total scenarios: %d (%.1f/sec)\n", len(all), float64(len(all))/wall.Seconds())
|
||
if len(all) == 0 {
|
||
return
|
||
}
|
||
|
||
// Per-scenario breakdown
|
||
type scStats struct {
|
||
name string
|
||
runs int
|
||
fails int
|
||
validator map[string]int
|
||
failedAt map[string]int
|
||
lat latStats
|
||
}
|
||
bucket := map[string]*scStats{}
|
||
for _, r := range all {
|
||
s, ok := bucket[r.name]
|
||
if !ok {
|
||
s = &scStats{name: r.name, validator: map[string]int{}, failedAt: map[string]int{}}
|
||
bucket[r.name] = s
|
||
}
|
||
s.runs++
|
||
s.lat.add(r.latency)
|
||
if !r.success {
|
||
s.fails++
|
||
if r.failedAt != "" {
|
||
s.failedAt[r.failedAt]++
|
||
}
|
||
}
|
||
if r.validator != "" {
|
||
s.validator[r.validator]++
|
||
}
|
||
}
|
||
|
||
// Print per-scenario table
|
||
names := []string{"cold_search_email", "surge_fill_validate", "profile_swap", "repeat_cache", "sms_validate", "playbook_record_replay"}
|
||
fmt.Println()
|
||
fmt.Printf("%-26s %8s %7s %8s %8s %8s %8s\n",
|
||
"scenario", "runs", "fail%", "p50", "p95", "p99", "max")
|
||
for _, n := range names {
|
||
s, ok := bucket[n]
|
||
if !ok {
|
||
continue
|
||
}
|
||
failPct := 100 * float64(s.fails) / float64(s.runs)
|
||
fmt.Printf("%-26s %8d %6.1f%% %8v %8v %8v %8v\n",
|
||
n, s.runs, failPct,
|
||
s.lat.percentile(0.50).Round(time.Microsecond),
|
||
s.lat.percentile(0.95).Round(time.Microsecond),
|
||
s.lat.percentile(0.99).Round(time.Microsecond),
|
||
s.lat.percentile(1.00).Round(time.Microsecond),
|
||
)
|
||
}
|
||
|
||
// Failure breakdown
|
||
totalFails := 0
|
||
for _, s := range bucket {
|
||
totalFails += s.fails
|
||
}
|
||
if totalFails > 0 {
|
||
fmt.Println("\n── failure-step breakdown ──")
|
||
for _, n := range names {
|
||
s, ok := bucket[n]
|
||
if !ok || s.fails == 0 {
|
||
continue
|
||
}
|
||
fmt.Printf(" %s:\n", n)
|
||
for step, count := range s.failedAt {
|
||
fmt.Printf(" %s: %d\n", step, count)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Validator pass/fail
|
||
fmt.Println("\n── validator outcomes ──")
|
||
for _, n := range names {
|
||
s, ok := bucket[n]
|
||
if !ok || len(s.validator) == 0 {
|
||
continue
|
||
}
|
||
var keys []string
|
||
for k := range s.validator {
|
||
keys = append(keys, k)
|
||
}
|
||
sort.Strings(keys)
|
||
var parts []string
|
||
for _, k := range keys {
|
||
parts = append(parts, fmt.Sprintf("%s=%d", k, s.validator[k]))
|
||
}
|
||
fmt.Printf(" %-26s %s\n", n, strings.Join(parts, " · "))
|
||
}
|
||
|
||
// JSON summary on stderr for parsability
|
||
type summary struct {
|
||
WallSec float64 `json:"wall_sec"`
|
||
Total int `json:"total_scenarios"`
|
||
Failures int `json:"failures"`
|
||
PerScenario map[string]struct {
|
||
Runs int `json:"runs"`
|
||
Fails int `json:"fails"`
|
||
P50Ms float64 `json:"p50_ms"`
|
||
P99Ms float64 `json:"p99_ms"`
|
||
} `json:"per_scenario"`
|
||
}
|
||
s := summary{
|
||
WallSec: wall.Seconds(), Total: len(all), Failures: totalFails,
|
||
PerScenario: map[string]struct {
|
||
Runs int `json:"runs"`
|
||
Fails int `json:"fails"`
|
||
P50Ms float64 `json:"p50_ms"`
|
||
P99Ms float64 `json:"p99_ms"`
|
||
}{},
|
||
}
|
||
for _, n := range names {
|
||
st, ok := bucket[n]
|
||
if !ok {
|
||
continue
|
||
}
|
||
s.PerScenario[n] = struct {
|
||
Runs int `json:"runs"`
|
||
Fails int `json:"fails"`
|
||
P50Ms float64 `json:"p50_ms"`
|
||
P99Ms float64 `json:"p99_ms"`
|
||
}{
|
||
Runs: st.runs, Fails: st.fails,
|
||
P50Ms: float64(st.lat.percentile(0.50).Microseconds()) / 1000,
|
||
P99Ms: float64(st.lat.percentile(0.99).Microseconds()) / 1000,
|
||
}
|
||
}
|
||
enc, _ := json.MarshalIndent(s, "", " ")
|
||
fmt.Fprintf(os.Stderr, "\n%s\n", enc)
|
||
}
|