// loadgen — sustained-traffic load generator for the G5 cutover // slice. Hammers a target URL with rotating bodies, captures // per-request latency, prints p50/p95/p99 + error breakdown + // throughput. // // Designed for the cutover-prep "is the Go substrate // production-ready under load?" probe. Not a full Vegeta/wrk // replacement — focused on the matrix/search shape with body // rotation to exercise embedder + corpus. // // Usage: // loadgen -url http://localhost:3700/_go/v1/matrix/search \ // -concurrency 10 -duration 60s // // Bodies are read from a file (one JSON-payload per line) and // rotated round-robin. -bodies-file defaults to a built-in mix // of 6 staffing queries. package main import ( "bytes" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "sort" "strings" "sync" "sync/atomic" "time" ) // defaultBodies is the built-in mix when no -bodies-file is given. // 6 distinct queries across roles + geos so embed cache + cosine // retrieval both get exercised. Each is a real fill_events-shape. var defaultBodies = []string{ `{"query_text":"Need 3 Forklift Operators in Aurora IL for Parallel Machining","query_role":"Forklift Operator","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, `{"query_text":"Need 1 CNC Operator in Detroit MI for Beacon Freight","query_role":"CNC Operator","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, `{"query_text":"Need 5 Warehouse Associates in Indianapolis IN at 12:00 for Midway Distribution","query_role":"Warehouse Associates","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, `{"query_text":"Need 2 Pickers in Joliet IL for Parallel Machining","query_role":"Pickers","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, `{"query_text":"Need 4 Loaders in Indianapolis IN starting at 12:00 for Midway Distribution","query_role":"Loaders","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, `{"query_text":"Need 1 Shipping Clerk in Flint MI for Pioneer Assembly","query_role":"Shipping Clerk","corpora":["workers"],"k":5,"per_corpus_k":5,"use_playbook":false}`, } type result struct { latency time.Duration status int err error } func main() { url := flag.String("url", "http://localhost:3700/_go/v1/matrix/search", "target URL (defaults to Bun /_go/v1/matrix/search; pass http://localhost:3110/v1/matrix/search to hit Go gateway directly without Bun)") concurrency := flag.Int("concurrency", 10, "concurrent workers") duration := flag.Duration("duration", 30*time.Second, "load duration") bodiesFile := flag.String("bodies-file", "", "file with one JSON body per line (defaults to built-in 6-query mix)") timeout := flag.Duration("timeout", 30*time.Second, "per-request timeout") flag.Parse() bodies := defaultBodies if *bodiesFile != "" { data, err := os.ReadFile(*bodiesFile) if err != nil { log.Fatalf("read bodies-file: %v", err) } bodies = nil for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") { continue } bodies = append(bodies, line) } if len(bodies) == 0 { log.Fatalf("no bodies in %s", *bodiesFile) } } hc := &http.Client{Timeout: *timeout} results := make(chan result, *concurrency*2) stop := make(chan struct{}) var sent int64 var wg sync.WaitGroup for w := 0; w < *concurrency; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() i := workerID for { select { case <-stop: return default: } body := bodies[i%len(bodies)] i++ start := time.Now() req, _ := http.NewRequest("POST", *url, bytes.NewReader([]byte(body))) req.Header.Set("Content-Type", "application/json") resp, err := hc.Do(req) latency := time.Since(start) if err != nil { results <- result{latency: latency, status: 0, err: err} continue } _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() results <- result{latency: latency, status: resp.StatusCode} atomic.AddInt64(&sent, 1) } }(w) } // Reporter goroutine drains results; main goroutine times out the run. all := make([]result, 0, 1024) doneCollect := make(chan struct{}) go func() { for r := range results { all = append(all, r) } close(doneCollect) }() log.Printf("[loadgen] hammering %s · concurrency=%d · duration=%v · bodies=%d", *url, *concurrency, *duration, len(bodies)) startAll := time.Now() time.Sleep(*duration) close(stop) wg.Wait() close(results) <-doneCollect wallElapsed := time.Since(startAll) report(*url, all, wallElapsed, *concurrency, len(bodies)) } func report(url string, all []result, wall time.Duration, concurrency, bodies int) { fmt.Printf("\n══ load report ══\n") fmt.Printf("url: %s\n", url) fmt.Printf("wall: %v · concurrency=%d · bodies=%d\n", wall.Round(time.Millisecond), concurrency, bodies) fmt.Printf("requests: %d\n", len(all)) if len(all) == 0 { return } fmt.Printf("rps: %.1f\n", float64(len(all))/wall.Seconds()) // Status breakdown. statuses := map[int]int{} errs := 0 successLat := make([]time.Duration, 0, len(all)) for _, r := range all { if r.err != nil { errs++ continue } statuses[r.status]++ if r.status/100 == 2 { successLat = append(successLat, r.latency) } } fmt.Printf("\nstatus codes:\n") codes := make([]int, 0, len(statuses)) for c := range statuses { codes = append(codes, c) } sort.Ints(codes) for _, c := range codes { fmt.Printf(" %d: %d (%.1f%%)\n", c, statuses[c], 100*float64(statuses[c])/float64(len(all))) } if errs > 0 { fmt.Printf(" err (transport): %d (%.1f%%)\n", errs, 100*float64(errs)/float64(len(all))) } if len(successLat) > 0 { sort.Slice(successLat, func(i, j int) bool { return successLat[i] < successLat[j] }) p := func(pct float64) time.Duration { idx := int(pct * float64(len(successLat))) if idx >= len(successLat) { idx = len(successLat) - 1 } return successLat[idx] } var totalLat time.Duration for _, l := range successLat { totalLat += l } fmt.Printf("\nlatency (2xx only, n=%d):\n", len(successLat)) fmt.Printf(" min: %v\n", successLat[0].Round(time.Microsecond)) fmt.Printf(" p50: %v\n", p(0.50).Round(time.Microsecond)) fmt.Printf(" p95: %v\n", p(0.95).Round(time.Microsecond)) fmt.Printf(" p99: %v\n", p(0.99).Round(time.Microsecond)) fmt.Printf(" max: %v\n", successLat[len(successLat)-1].Round(time.Microsecond)) fmt.Printf(" mean: %v\n", (totalLat / time.Duration(len(successLat))).Round(time.Microsecond)) } // Sample a few error messages so operator can see what broke. if errs > 0 { fmt.Printf("\nfirst 3 transport errors:\n") shown := 0 for _, r := range all { if r.err != nil { fmt.Printf(" - %v (after %v)\n", r.err, r.latency.Round(time.Microsecond)) shown++ if shown >= 3 { break } } } } // JSON summary on stderr for parsability. type summary struct { URL string `json:"url"` WallSec float64 `json:"wall_sec"` Requests int `json:"requests"` RPS float64 `json:"rps"` Errors int `json:"errors"` Codes map[string]int `json:"codes"` LatencyMs struct { Min float64 `json:"min"` P50 float64 `json:"p50"` P95 float64 `json:"p95"` P99 float64 `json:"p99"` Max float64 `json:"max"` Mean float64 `json:"mean"` } `json:"latency_ms"` } codesS := map[string]int{} for c, n := range statuses { codesS[fmt.Sprintf("%d", c)] = n } s := summary{ URL: url, WallSec: wall.Seconds(), Requests: len(all), RPS: float64(len(all)) / wall.Seconds(), Errors: errs, Codes: codesS, } if len(successLat) > 0 { toMs := func(d time.Duration) float64 { return float64(d.Nanoseconds()) / 1e6 } s.LatencyMs.Min = toMs(successLat[0]) s.LatencyMs.Max = toMs(successLat[len(successLat)-1]) idx := func(pct float64) int { i := int(pct * float64(len(successLat))) if i >= len(successLat) { i = len(successLat) - 1 } return i } s.LatencyMs.P50 = toMs(successLat[idx(0.50)]) s.LatencyMs.P95 = toMs(successLat[idx(0.95)]) s.LatencyMs.P99 = toMs(successLat[idx(0.99)]) var t time.Duration for _, l := range successLat { t += l } s.LatencyMs.Mean = toMs(t / time.Duration(len(successLat))) } enc, _ := json.MarshalIndent(s, "", " ") fmt.Fprintf(os.Stderr, "\n%s\n", enc) }