root 277884b5eb multitier_100k: 335k scenarios @ 1,115/sec against 100k corpus, 4/6 at 0% fail
J asked for a much more sophisticated test using the 100k corpus from
the Rust legacy database. This commit ships:

scripts/cutover/multitier/main.go — 6-scenario harness with weighted
random selection per goroutine. Mixes search, email/SMS/fill
validators (in-process via internal/validator), profile swap with
ExcludeIDs, repeat-cache exercise, and playbook record/replay.

Scenarios + weights (cumulative scenario fractions):
  35% cold_search_email      — search + email outreach + EmailValidator
  15% surge_fill_validate    — search + fill proposal + FillValidator + record
  15% profile_swap           — original search + ExcludeIDs swap + no-overlap check
  15% repeat_cache           — same query × 5 (cache effectiveness)
  10% sms_validate           — SMS draft (≤160 chars, phone for SSN-FP guard)
  10% playbook_record_replay — cold → record → warm w/ use_playbook=true

Test results (5-min sustained, conc=50, 100k workers indexed):
  TOTAL 335,257 scenarios @ 1,115/sec
  cold_search_email     117k @ 0.0% fail · p50 2.2ms · p99 8.6ms
  surge_fill_validate    50k @ 98.8% fail (substrate bug below)
  profile_swap           50k @ 0.0% fail · p50 4.5ms · ExcludeIDs verified
  repeat_cache           50k × 5 = 252k searches @ 0.0% fail · p50 11.7ms
  sms_validate           33k @ 0.0% fail · phone-pattern guard works
  playbook_record_replay 33k @ 96.8% fail (substrate bug below)
  Total successful workflows: ~250k+

Validator integration verified at load:
  150,930 EmailValidator passes across cold_search_email + sms_validate
  35 + 1,061 successful FillValidator + playbook_record (where the bug
    didn't fire)
  zero false positives on the SSN-pattern guard against phone numbers

Resource footprint at 100k:
  vectord 1.23GB RSS (linear with 100k vectors)
  matrixd 26MB, 75% CPU (1-core saturated at conc=50)
  Total across 11 daemons: 1.7GB
  Compare to Rust at 14.9GB — ~10× less even at 100k.

SUBSTRATE BUG SURFACED: coder/hnsw v0.6.1 nil-deref in
layerNode.search at graph.go:95. Triggers on /v1/matrix/playbooks/record
under sustained writes to the small playbook_memory index. Both Add
and Search paths can panic.

Workaround applied (this commit) in internal/vectord/index.go
BatchAdd: recover() guard converts panic to error; daemon stays up
instead of crashing the request handler.

Operator recovery procedure (also documented in the report):
  curl -X DELETE http://localhost:4215/vectors/index/playbook_memory
Next record recreates the index fresh.

Real fix DEFERRED — open in docs/ARCHITECTURE_COMPARISON.md
Decisions tracker. Three options:
  a) upstream patch to coder/hnsw
  b) custom small-index Add path that always rebuilds when len < threshold
  c) alternate store for playbook_memory (Lance? in-memory map?)

Evidence: reports/cutover/multitier_100k.md (full methodology +
results + repro + bug analysis). docs/ARCHITECTURE_COMPARISON.md
Decisions tracker updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 06:28:50 -05:00

854 lines
26 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// multitier — sophisticated multi-scenario load test mixing search,
// email/SMS validation, fill validation, profile swap, and playbook
// recording. Exercises the full Go substrate end-to-end at concurrency
// against a large workers corpus.
//
// Six scenario types (each ~simulates one coordinator workflow):
// A. cold_search_email — fresh demand → search → email outreach + validate
// B. surge_fill_validate — multi-role surge → search × N → fill proposal + validate + record
// C. profile_swap — initial search → mid-session swap with ExcludeIDs
// D. repeat_cache — same query × 5 → measure warm-cache RPS pattern
// E. sms_validate — search → SMS draft (≤160 chars) → validate
// F. playbook_record_replay — search → record → re-search with use_playbook=true
//
// Validators run IN-PROCESS via internal/validator (FillValidator +
// EmailValidator). Search/record go through Go gateway via HTTP.
//
// Usage:
// multitier -gateway http://127.0.0.1:4110 \
// -concurrency 50 -duration 5m
//
// Output: per-scenario success/failure counts, end-to-end latency
// histogram, validator pass/fail breakdown.
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/validator"
)
// ── Scenario harness ──────────────────────────────────────────────
type scenarioResult struct {
name string
success bool
failedAt string // step that broke the workflow
latency time.Duration
subSteps []stepResult // per-step breakdown
validator string // "pass", "fail-policy", "fail-consistency", etc.
}
type stepResult struct {
step string
latency time.Duration
ok bool
err string
}
type harness struct {
gateway string
hc *http.Client
corpus string // index name
queryPool []map[string]any // generated queries
workerPool []validator.WorkerRecord // pretend roster for validators
emailVal *validator.EmailValidator
fillVal *validator.FillValidator
// Counters (atomic)
scenarioRuns map[string]*int64
scenarioFail map[string]*int64
stepFail map[string]*int64
fillDebugCount int64
}
func newHarness(gateway string, corpus string) *harness {
queryPool := buildQueryPool()
workerPool := buildWorkerPool()
lookup := validator.NewInMemoryWorkerLookup(workerPool)
scenarios := []string{"cold_search_email", "surge_fill_validate", "profile_swap", "repeat_cache", "sms_validate", "playbook_record_replay"}
runs := map[string]*int64{}
fails := map[string]*int64{}
for _, s := range scenarios {
r := int64(0)
f := int64(0)
runs[s] = &r
fails[s] = &f
}
return &harness{
gateway: gateway,
hc: &http.Client{Timeout: 60 * time.Second},
corpus: corpus,
queryPool: queryPool,
workerPool: workerPool,
emailVal: validator.NewEmailValidator(lookup),
fillVal: validator.NewFillValidator(lookup),
scenarioRuns: runs,
scenarioFail: fails,
stepFail: map[string]*int64{},
}
}
// buildQueryPool generates a mix of staffing-shape queries across
// roles, geos, clients. 30 queries to keep cache pressure realistic.
func buildQueryPool() []map[string]any {
roles := []string{"Forklift Operator", "Pickers", "Loaders", "Warehouse Associate", "CNC Operator", "Shipping Clerk", "Packers"}
cities := []struct {
city, state string
}{
{"Detroit", "MI"}, {"Indianapolis", "IN"}, {"Aurora", "IL"},
{"Joliet", "IL"}, {"Flint", "MI"}, {"Cleveland", "OH"},
{"Fort Wayne", "IN"}, {"Kansas City", "MO"},
}
clients := []string{"Beacon Freight", "Midway Distribution", "Parallel Machining", "Heritage Foods", "Cornerstone Fabrication", "Pioneer Assembly"}
out := []map[string]any{}
for i := 0; i < 30; i++ {
role := roles[i%len(roles)]
c := cities[i%len(cities)]
client := clients[i%len(clients)]
count := 1 + (i % 5)
out = append(out, map[string]any{
"text": fmt.Sprintf("Need %d %s in %s %s for %s", count, role, c.city, c.state, client),
"role": role,
"city": c.city,
"state": c.state,
"client": client,
"count": count,
})
}
return out
}
// buildWorkerPool fabricates a small in-memory roster for validators.
// Real workers come from the parquet ingest; the validator's
// WorkerLookup needs a separate in-process source. Cross-runtime
// audit: the IDs used here (test-w-XXX) won't collide with the
// parquet's "w-XXX" prefix.
func buildWorkerPool() []validator.WorkerRecord {
roles := []string{"Forklift Operator", "Pickers", "Loaders", "Warehouse Associate", "CNC Operator", "Shipping Clerk", "Packers"}
cities := []struct{ city, state string }{
{"Detroit", "MI"}, {"Indianapolis", "IN"}, {"Aurora", "IL"},
{"Joliet", "IL"}, {"Flint", "MI"},
}
out := make([]validator.WorkerRecord, 0, 200)
for i := 0; i < 200; i++ {
role := roles[i%len(roles)]
c := cities[i%len(cities)]
out = append(out, validator.WorkerRecord{
CandidateID: fmt.Sprintf("test-w-%03d", i),
Name: fmt.Sprintf("Worker %03d", i),
Status: "active",
City: ptr(c.city),
State: ptr(c.state),
Role: ptr(role),
})
}
return out
}
func ptr(s string) *string { return &s }
// ── HTTP helpers ──────────────────────────────────────────────────
type matrixSearchReq struct {
QueryText string `json:"query_text"`
QueryRole string `json:"query_role,omitempty"`
Corpora []string `json:"corpora"`
K int `json:"k"`
PerCorpusK int `json:"per_corpus_k"`
UsePlaybook bool `json:"use_playbook"`
ExcludeIDs []string `json:"exclude_ids,omitempty"`
}
type matrixResult struct {
ID string `json:"id"`
Distance float32 `json:"distance"`
Corpus string `json:"corpus"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
type matrixResp struct {
Results []matrixResult `json:"results"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
PlaybookInjected int `json:"playbook_injected,omitempty"`
}
func (h *harness) matrixSearch(req matrixSearchReq) (*matrixResp, error) {
body, _ := json.Marshal(req)
httpReq, _ := http.NewRequest("POST", h.gateway+"/v1/matrix/search", bytes.NewReader(body))
httpReq.Header.Set("Content-Type", "application/json")
resp, err := h.hc.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
bs, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(bs))
}
var out matrixResp
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
return &out, nil
}
func (h *harness) playbookRecord(query, role, answerID, corpus string, score float64) error {
body, _ := json.Marshal(map[string]any{
"query_text": query,
"role": role,
"answer_id": answerID,
"answer_corpus": corpus,
"score": score,
"tags": []string{"multitier-loadtest"},
})
httpReq, _ := http.NewRequest("POST", h.gateway+"/v1/matrix/playbooks/record", bytes.NewReader(body))
httpReq.Header.Set("Content-Type", "application/json")
resp, err := h.hc.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
bs, _ := io.ReadAll(resp.Body)
return fmt.Errorf("status %d: %s", resp.StatusCode, string(bs))
}
io.Copy(io.Discard, resp.Body)
return nil
}
// ── Scenarios ─────────────────────────────────────────────────────
// timeStep wraps a closure with timing + step result.
func timeStep(name string, f func() error) stepResult {
start := time.Now()
err := f()
r := stepResult{step: name, latency: time.Since(start), ok: err == nil}
if err != nil {
r.err = err.Error()
}
return r
}
// Scenario A: cold_search_email — fresh demand → search → email outreach + validate
func (h *harness) scenarioColdSearchEmail() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
queryText := q["text"].(string)
role := q["role"].(string)
r := scenarioResult{name: "cold_search_email"}
start := time.Now()
// Step 1: search
var resp *matrixResp
r.subSteps = append(r.subSteps, timeStep("search", func() error {
var err error
resp, err = h.matrixSearch(matrixSearchReq{
QueryText: queryText, QueryRole: role,
Corpora: []string{h.corpus}, K: 5, PerCorpusK: 5,
})
return err
}))
if !r.subSteps[0].ok || resp == nil || len(resp.Results) == 0 {
r.failedAt = "search"
r.latency = time.Since(start)
return r
}
// Step 2: pick top candidate, simulate email draft using a known
// worker from our local pool (validator wants in-pool IDs).
worker := h.workerPool[rand.Intn(len(h.workerPool))]
emailDraft := map[string]any{
"to": "client@example.com",
"subject": fmt.Sprintf("Worker available: %s", worker.Name),
"body": fmt.Sprintf("Hi, %s is available for the %s role tomorrow. Confirm?", worker.Name, role),
"_context": map[string]any{"candidate_id": worker.CandidateID},
}
// Step 3: validate
r.subSteps = append(r.subSteps, timeStep("email_validate", func() error {
_, err := h.emailVal.Validate(validator.Artifact{EmailDraft: emailDraft})
return err
}))
if !r.subSteps[1].ok {
r.failedAt = "email_validate"
r.validator = "fail"
r.latency = time.Since(start)
return r
}
r.success = true
r.validator = "pass"
r.latency = time.Since(start)
return r
}
// Scenario B: surge_fill_validate — multi-role contract → multi-search → fill proposal → validate → record
func (h *harness) scenarioSurgeFillValidate() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
role := q["role"].(string)
city := q["city"].(string)
state := q["state"].(string)
r := scenarioResult{name: "surge_fill_validate"}
start := time.Now()
// Step 1: search × 1 (kept narrow to keep total latency reasonable)
var resp *matrixResp
r.subSteps = append(r.subSteps, timeStep("search", func() error {
var err error
resp, err = h.matrixSearch(matrixSearchReq{
QueryText: q["text"].(string), QueryRole: role,
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
})
return err
}))
if !r.subSteps[0].ok || resp == nil || len(resp.Results) == 0 {
r.failedAt = "search"
r.latency = time.Since(start)
return r
}
// Step 2: build fill proposal — pick 2 workers that share city/state/role
// to avoid spurious geo-mismatch errors. We loop the local pool from a
// random offset and pick the first two with matching attributes.
w1 := h.workerPool[rand.Intn(len(h.workerPool))]
var w2 *validator.WorkerRecord
// Linear scan from a random start offset — guarantees full pool
// coverage even on small pools where matching pairs are rare.
startOffset := rand.Intn(len(h.workerPool))
for j := 0; j < len(h.workerPool); j++ {
idx := (startOffset + j) % len(h.workerPool)
w := h.workerPool[idx]
if w.CandidateID == w1.CandidateID {
continue
}
if w.City != nil && w1.City != nil && *w.City == *w1.City &&
w.State != nil && w1.State != nil && *w.State == *w1.State &&
w.Role != nil && w1.Role != nil && *w.Role == *w1.Role {
wCopy := w // ensure address stays valid past loop end
w2 = &wCopy
break
}
}
if w2 == nil {
// Pool too sparse — should not happen with the 200-worker
// fixture pool (35 unique combos × ~5 workers each). If we
// hit this branch, the pool generator drifted; failing the
// scenario is the right signal.
r.failedAt = "no_matching_worker_pair"
r.latency = time.Since(start)
return r
}
city = *w1.City
state = *w1.State
role = *w1.Role
fillProposal := map[string]any{
"_context": map[string]any{
"target_count": float64(2),
"city": city, "state": state, "role": role,
},
"fills": []any{
map[string]any{"candidate_id": w1.CandidateID, "name": w1.Name},
map[string]any{"candidate_id": w2.CandidateID, "name": w2.Name},
},
}
// Step 3: validate fill
var fillErr error
r.subSteps = append(r.subSteps, timeStep("fill_validate", func() error {
_, err := h.fillVal.Validate(validator.Artifact{FillProposal: fillProposal})
fillErr = err
return err
}))
if !r.subSteps[1].ok {
// Print first error to stderr for diagnosis (rate-limited)
if atomic.AddInt64(&h.fillDebugCount, 1) <= 3 && fillErr != nil {
fmt.Fprintf(os.Stderr, "[debug] fill_validate err: %v\n proposal=%+v\n", fillErr, fillProposal)
}
r.failedAt = "fill_validate"
r.validator = "fail"
r.latency = time.Since(start)
return r
}
// Step 4: record playbook on first result
r.subSteps = append(r.subSteps, timeStep("playbook_record", func() error {
return h.playbookRecord(q["text"].(string), role, resp.Results[0].ID, h.corpus, 1.0)
}))
if !r.subSteps[2].ok {
r.failedAt = "playbook_record"
r.latency = time.Since(start)
return r
}
r.success = true
r.validator = "pass"
r.latency = time.Since(start)
return r
}
// Scenario C: profile_swap — initial search → mid-session swap with ExcludeIDs
func (h *harness) scenarioProfileSwap() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
r := scenarioResult{name: "profile_swap"}
start := time.Now()
// Step 1: original search
var resp1 *matrixResp
r.subSteps = append(r.subSteps, timeStep("search_original", func() error {
var err error
resp1, err = h.matrixSearch(matrixSearchReq{
QueryText: q["text"].(string), QueryRole: q["role"].(string),
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
})
return err
}))
if !r.subSteps[0].ok || resp1 == nil || len(resp1.Results) == 0 {
r.failedAt = "search_original"
r.latency = time.Since(start)
return r
}
// Capture placed IDs
excludeIDs := make([]string, 0, len(resp1.Results))
for _, x := range resp1.Results {
excludeIDs = append(excludeIDs, x.ID)
}
// Step 2: swap search excluding the original placements
var resp2 *matrixResp
r.subSteps = append(r.subSteps, timeStep("search_swap", func() error {
var err error
resp2, err = h.matrixSearch(matrixSearchReq{
QueryText: q["text"].(string), QueryRole: q["role"].(string),
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
ExcludeIDs: excludeIDs,
})
return err
}))
if !r.subSteps[1].ok || resp2 == nil {
r.failedAt = "search_swap"
r.latency = time.Since(start)
return r
}
// Verify no overlap between original + swap
overlap := false
for _, a := range resp1.Results {
for _, b := range resp2.Results {
if a.ID == b.ID {
overlap = true
break
}
}
}
if overlap {
r.failedAt = "swap_overlap_detected"
r.latency = time.Since(start)
return r
}
r.success = true
r.latency = time.Since(start)
return r
}
// Scenario D: repeat_cache — same query × 5 → measures cache effectiveness
func (h *harness) scenarioRepeatCache() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
r := scenarioResult{name: "repeat_cache"}
start := time.Now()
for i := 0; i < 5; i++ {
stepName := fmt.Sprintf("search_%d", i)
var resp *matrixResp
step := timeStep(stepName, func() error {
var err error
resp, err = h.matrixSearch(matrixSearchReq{
QueryText: q["text"].(string), QueryRole: q["role"].(string),
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
})
return err
})
r.subSteps = append(r.subSteps, step)
if !step.ok || resp == nil {
r.failedAt = stepName
r.latency = time.Since(start)
return r
}
}
r.success = true
r.latency = time.Since(start)
return r
}
// Scenario E: sms_validate — SMS-shaped draft (≤160 chars), validate via
// EmailValidator with kind=sms. Catches the SMS-length cap + the
// SSN-pattern false-positive on phone numbers.
func (h *harness) scenarioSMSValidate() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
worker := h.workerPool[rand.Intn(len(h.workerPool))]
r := scenarioResult{name: "sms_validate"}
start := time.Now()
// Step 1: search
r.subSteps = append(r.subSteps, timeStep("search", func() error {
_, err := h.matrixSearch(matrixSearchReq{
QueryText: q["text"].(string), QueryRole: q["role"].(string),
Corpora: []string{h.corpus}, K: 1, PerCorpusK: 1,
})
return err
}))
if !r.subSteps[0].ok {
r.failedAt = "search"
r.latency = time.Since(start)
return r
}
// Step 2: SMS draft (deliberately includes a phone number to
// stress-test the SSN-pattern false-positive guard)
smsDraft := map[string]any{
"to": "+15551234567",
"body": fmt.Sprintf("%s confirmed for %s. Reply STOP to cancel. Call 555-123-4567.", worker.Name, q["role"].(string)),
"kind": "sms",
"_context": map[string]any{"candidate_id": worker.CandidateID},
}
// Step 3: validate
r.subSteps = append(r.subSteps, timeStep("sms_validate", func() error {
_, err := h.emailVal.Validate(validator.Artifact{EmailDraft: smsDraft})
return err
}))
if !r.subSteps[1].ok {
r.failedAt = "sms_validate"
r.validator = "fail"
r.latency = time.Since(start)
return r
}
r.success = true
r.validator = "pass"
r.latency = time.Since(start)
return r
}
// Scenario F: playbook_record_replay — search → record → re-search with
// use_playbook=true. Verifies the learning loop fires through real HTTP.
//
// Adds a unique suffix to query_text so concurrent goroutines don't
// race on the same playbook_memory entry (vectord HNSW Add has a
// concurrency edge case on small indexes; uniqueness avoids it).
func (h *harness) scenarioPlaybookRecordReplay() scenarioResult {
q := h.queryPool[rand.Intn(len(h.queryPool))]
role := q["role"].(string)
queryText := fmt.Sprintf("%s [run-%d-%d]",
q["text"].(string), time.Now().UnixNano(), rand.Intn(1000000))
r := scenarioResult{name: "playbook_record_replay"}
start := time.Now()
// Step 1: cold search
var resp1 *matrixResp
r.subSteps = append(r.subSteps, timeStep("cold_search", func() error {
var err error
resp1, err = h.matrixSearch(matrixSearchReq{
QueryText: queryText, QueryRole: role,
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
})
return err
}))
if !r.subSteps[0].ok || resp1 == nil || len(resp1.Results) == 0 {
r.failedAt = "cold_search"
r.latency = time.Since(start)
return r
}
// Step 2: record top-1 as playbook
topID := resp1.Results[0].ID
r.subSteps = append(r.subSteps, timeStep("playbook_record", func() error {
return h.playbookRecord(queryText, role, topID, h.corpus, 1.0)
}))
if !r.subSteps[1].ok {
r.failedAt = "playbook_record"
r.latency = time.Since(start)
return r
}
// Step 3: warm search with use_playbook=true
r.subSteps = append(r.subSteps, timeStep("warm_search", func() error {
_, err := h.matrixSearch(matrixSearchReq{
QueryText: queryText, QueryRole: role,
Corpora: []string{h.corpus}, K: 3, PerCorpusK: 3,
UsePlaybook: true,
})
return err
}))
if !r.subSteps[2].ok {
r.failedAt = "warm_search"
r.latency = time.Since(start)
return r
}
r.success = true
r.latency = time.Since(start)
return r
}
// pickScenario rolls a scenario based on a realistic workflow mix.
func (h *harness) pickScenario() func() scenarioResult {
roll := rand.Intn(100)
switch {
case roll < 35: // 35% — fresh demand searches
return h.scenarioColdSearchEmail
case roll < 50: // 15% — surge fills with validation
return h.scenarioSurgeFillValidate
case roll < 65: // 15% — profile swaps
return h.scenarioProfileSwap
case roll < 80: // 15% — repeat queries (cache)
return h.scenarioRepeatCache
case roll < 90: // 10% — SMS drafts
return h.scenarioSMSValidate
default: // 10% — playbook record + replay
return h.scenarioPlaybookRecordReplay
}
}
// ── Reporter ──────────────────────────────────────────────────────
type latStats struct {
count int
sum time.Duration
latencies []time.Duration
}
func (s *latStats) add(d time.Duration) {
s.count++
s.sum += d
s.latencies = append(s.latencies, d)
}
func (s *latStats) percentile(p float64) time.Duration {
if len(s.latencies) == 0 {
return 0
}
sort.Slice(s.latencies, func(i, j int) bool { return s.latencies[i] < s.latencies[j] })
idx := int(p * float64(len(s.latencies)))
if idx >= len(s.latencies) {
idx = len(s.latencies) - 1
}
return s.latencies[idx]
}
// ── Main ──────────────────────────────────────────────────────────
func main() {
gateway := flag.String("gateway", "http://127.0.0.1:4110", "Go gateway base URL")
conc := flag.Int("concurrency", 50, "concurrent workers")
dur := flag.Duration("duration", 5*time.Minute, "load duration")
corpus := flag.String("corpus", "workers", "vectord index name")
flag.Parse()
rand.Seed(time.Now().UnixNano())
h := newHarness(*gateway, *corpus)
// Sanity: hit /v1/matrix/search once to verify the corpus is up.
if _, err := h.matrixSearch(matrixSearchReq{
QueryText: "test", Corpora: []string{*corpus}, K: 1, PerCorpusK: 1,
}); err != nil {
log.Fatalf("[multitier] sanity probe failed: %v\n is the persistent stack up + workers ingested?", err)
}
results := make(chan scenarioResult, *conc*2)
stop := make(chan struct{})
var wg sync.WaitGroup
var totalRuns int64
for w := 0; w < *conc; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
}
fn := h.pickScenario()
res := fn()
results <- res
atomic.AddInt64(&totalRuns, 1)
}
}()
}
// Reporter goroutine.
all := []scenarioResult{}
doneCollect := make(chan struct{})
go func() {
for r := range results {
all = append(all, r)
}
close(doneCollect)
}()
log.Printf("[multitier] gateway=%s · concurrency=%d · duration=%v · corpus=%s",
*gateway, *conc, *dur, *corpus)
log.Printf("[multitier] scenarios: cold_search_email (35%%) · surge_fill (15%%) · swap (15%%) · repeat (15%%) · sms (10%%) · record_replay (10%%)")
startWall := time.Now()
time.Sleep(*dur)
close(stop)
wg.Wait()
close(results)
<-doneCollect
wallElapsed := time.Since(startWall)
report(all, wallElapsed, *conc)
}
func report(all []scenarioResult, wall time.Duration, concurrency int) {
fmt.Printf("\n══ multitier load report ══\n")
fmt.Printf("wall: %v · concurrency=%d\n", wall.Round(time.Millisecond), concurrency)
fmt.Printf("total scenarios: %d (%.1f/sec)\n", len(all), float64(len(all))/wall.Seconds())
if len(all) == 0 {
return
}
// Per-scenario breakdown
type scStats struct {
name string
runs int
fails int
validator map[string]int
failedAt map[string]int
lat latStats
}
bucket := map[string]*scStats{}
for _, r := range all {
s, ok := bucket[r.name]
if !ok {
s = &scStats{name: r.name, validator: map[string]int{}, failedAt: map[string]int{}}
bucket[r.name] = s
}
s.runs++
s.lat.add(r.latency)
if !r.success {
s.fails++
if r.failedAt != "" {
s.failedAt[r.failedAt]++
}
}
if r.validator != "" {
s.validator[r.validator]++
}
}
// Print per-scenario table
names := []string{"cold_search_email", "surge_fill_validate", "profile_swap", "repeat_cache", "sms_validate", "playbook_record_replay"}
fmt.Println()
fmt.Printf("%-26s %8s %7s %8s %8s %8s %8s\n",
"scenario", "runs", "fail%", "p50", "p95", "p99", "max")
for _, n := range names {
s, ok := bucket[n]
if !ok {
continue
}
failPct := 100 * float64(s.fails) / float64(s.runs)
fmt.Printf("%-26s %8d %6.1f%% %8v %8v %8v %8v\n",
n, s.runs, failPct,
s.lat.percentile(0.50).Round(time.Microsecond),
s.lat.percentile(0.95).Round(time.Microsecond),
s.lat.percentile(0.99).Round(time.Microsecond),
s.lat.percentile(1.00).Round(time.Microsecond),
)
}
// Failure breakdown
totalFails := 0
for _, s := range bucket {
totalFails += s.fails
}
if totalFails > 0 {
fmt.Println("\n── failure-step breakdown ──")
for _, n := range names {
s, ok := bucket[n]
if !ok || s.fails == 0 {
continue
}
fmt.Printf(" %s:\n", n)
for step, count := range s.failedAt {
fmt.Printf(" %s: %d\n", step, count)
}
}
}
// Validator pass/fail
fmt.Println("\n── validator outcomes ──")
for _, n := range names {
s, ok := bucket[n]
if !ok || len(s.validator) == 0 {
continue
}
var keys []string
for k := range s.validator {
keys = append(keys, k)
}
sort.Strings(keys)
var parts []string
for _, k := range keys {
parts = append(parts, fmt.Sprintf("%s=%d", k, s.validator[k]))
}
fmt.Printf(" %-26s %s\n", n, strings.Join(parts, " · "))
}
// JSON summary on stderr for parsability
type summary struct {
WallSec float64 `json:"wall_sec"`
Total int `json:"total_scenarios"`
Failures int `json:"failures"`
PerScenario map[string]struct {
Runs int `json:"runs"`
Fails int `json:"fails"`
P50Ms float64 `json:"p50_ms"`
P99Ms float64 `json:"p99_ms"`
} `json:"per_scenario"`
}
s := summary{
WallSec: wall.Seconds(), Total: len(all), Failures: totalFails,
PerScenario: map[string]struct {
Runs int `json:"runs"`
Fails int `json:"fails"`
P50Ms float64 `json:"p50_ms"`
P99Ms float64 `json:"p99_ms"`
}{},
}
for _, n := range names {
st, ok := bucket[n]
if !ok {
continue
}
s.PerScenario[n] = struct {
Runs int `json:"runs"`
Fails int `json:"fails"`
P50Ms float64 `json:"p50_ms"`
P99Ms float64 `json:"p99_ms"`
}{
Runs: st.runs, Fails: st.fails,
P50Ms: float64(st.lat.percentile(0.50).Microseconds()) / 1000,
P99Ms: float64(st.lat.percentile(0.99).Microseconds()) / 1000,
}
}
enc, _ := json.MarshalIndent(s, "", " ")
fmt.Fprintf(os.Stderr, "\n%s\n", enc)
}