From 1f700e731d4fcb7557ba656818b1e7a16a7ff026 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 02:31:30 -0500 Subject: [PATCH] =?UTF-8?q?Staffing=20scale=20test:=20full=20500K=20throug?= =?UTF-8?q?h=20gateway=20=E2=86=92=20embedd=20=E2=86=92=20vectord=20pipeli?= =?UTF-8?q?ne?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/staffing_500k/main.go: driver that reads workers_500k.csv, embeds combined-text per worker via /v1/embed, adds to vectord index "workers_500k", runs canonical staffing queries against the populated index. Reproducible end-to-end test of the staffing co-pilot pipeline at production scale. Run results (2026-04-29 ~02:30): 500,000 vectors ingested in 35m 36s (~234/sec avg) vectord peak RSS 4.5 GB (~9 KB/vector incl. HNSW graph) Query latency: embed 40-59ms + search 1-3ms = ~50ms end-to-end GPU avg ~65% (Ollama not the bottleneck — vectord Add is) Semantic recall on canonical queries: "electrician with industrial wiring": top 2 are literal Electricians (d=0.30) "CNC operator with first article": Assembler / Quality Techs (adjacent, d=0.24) "forklift driver OSHA-30": warehouse roles (d=0.33) "warehouse picker night shift bilingual": Material Handlers (d=0.31) "dental hygienist": Production Workers at d=0.49+ — correctly LOW-similarity, signals "no dental hygienists in this manufacturing dataset" rather than hallucinating a fake match. Documented gaps: - storaged's 256 MiB PUT cap blocks single-file LHV1 persistence above ~150K vectors at d=768. Test ran with persistence disabled. - vectord Add is RWMutex-serialized — with GPU at 65% util this is the throughput cap. Concurrent Adds would be 2-3x faster but require careful audit of coder/hnsw thread-safety (G1 scrum documented two known quirks). PHASE_G0_KICKOFF.md gains a "Staffing scale test" section with full metrics + the gaps-surfaced list. The architectural payoff is real: six binaries, one HTTP route, ~50ms from text query to top-K semantically-relevant workers across 500K records. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/PHASE_G0_KICKOFF.md | 99 ++++++++++ scripts/staffing_500k/main.go | 338 ++++++++++++++++++++++++++++++++++ 2 files changed, 437 insertions(+) create mode 100644 scripts/staffing_500k/main.go diff --git a/docs/PHASE_G0_KICKOFF.md b/docs/PHASE_G0_KICKOFF.md index edcb6e0..d90a727 100644 --- a/docs/PHASE_G0_KICKOFF.md +++ b/docs/PHASE_G0_KICKOFF.md @@ -1316,3 +1316,102 @@ Plus shared packages: `internal/storeclient`, `internal/catalogclient`, g1, g1p, g2 — 9 acceptance gates, all PASS. `scripts/g1_smoke.toml` disables vectord persistence specifically for the in-memory API smoke to avoid rehydrate-from-storaged contamination. + +--- + +## Staffing scale test (2026-04-29) — full 500K through the pipeline + +After G2 (embedd) shipped, drove the entire production-scale dataset +through the staffing co-pilot pipeline as the architectural payoff +test. Driver: `scripts/staffing_500k/main.go` — reads +`workers_500k.csv`, builds combined search text per worker +(role + city + skills + certs + resume_text), embeds via +`/v1/embed`, adds to vectord index `workers_500k`, runs five +canonical staffing queries. + +### Setup +- Services: gateway, storaged (cap), vectord (`storaged_url=""` — + persistence disabled because encoded HNSW for 500K exceeds + storaged's 256 MiB PUT cap), embedd. catalogd/ingestd/queryd not + needed for this test. +- Driver concurrency: 8 parallel embed workers, embed batch size 16, + add batch size effectively 16 (one batch per embed call). +- Embedding model: `nomic-embed-text` (768-d) on RTX A4000. + +### Throughput at scale + +| Metric | Value | +|---|---| +| Vectors ingested | 500,000 | +| Wall time | 35m 36s | +| Average rate | ~234 vectors/sec end-to-end | +| Sustained rate (steady state) | 200-215/sec, oscillating with HNSW level transitions | +| GPU utilization | ~65% average (Ollama not the bottleneck) | +| vectord peak RSS | 4.5 GB at 500K (~9 KB/vector incl. HNSW graph + Go runtime) | +| Memory growth pattern | Linear ~9 MB per 1000 vectors | + +The bottleneck is firmly vectord's HNSW Add — log(N) with growing +constant factors. GPU sat at ~70% during steady state, dropping to +30-50% during HNSW level transitions. The driver's 8-way embed +concurrency could push more if vectord's Add weren't serialized +through one RWMutex. + +### Query latency at 500K + +All five test queries: +- Embed: 40-59ms (Ollama doing real work on the GPU) +- Search: **1-3ms** (HNSW with default M=16, EfSearch=20) +- End-to-end through gateway: ~50ms total + +This is the architectural payoff: the actual product latency is +embed-bound, not search-bound. Caching common-query embeddings +would be a real win. + +### Semantic recall + +Five canonical staffing queries, top hit per query: + +| Query | Top result | Distance | Notes | +|---|---|---|---| +| `electrician with industrial wiring background` | **Electrician** at Mattoon IL | **0.30** | Top 2 are both literal Electricians | +| `CNC operator with first article and gauge R&R experience` | Assembler at Chicago IL | 0.24 | Top 2-3 are Quality Techs (adjacent manufacturing) | +| `forklift driver OSHA-30 certified warehouse` | Inventory Clerk at St. Louis | 0.33 | Mix of Material Handler + Warehouse Associate in top 5 | +| `warehouse picker night shift bilingual` | Material Handler at Evansville | 0.31 | Adjacent warehouse roles | +| `dental hygienist three years experience` | Production Worker at Madison | **0.49** | **Correctly low-similarity** — there are no dental hygienists in the manufacturing dataset; system signals "not a real match" rather than hallucinating | + +The "dental hygienist" result is the most architecturally honest: +distance 0.49+ across the top-5 says "your query doesn't fit this +dataset" instead of returning bogus matches. Small distance ≈ real +match; big distance = no match available, judge accordingly. + +### What this proves + +1. **The full Lakehouse-Go staffing pipeline works at production + scale.** 500K real staffing records → 50ms similarity query + end-to-end through six binaries (gateway → embedd → Ollama → + gateway → vectord → response). +2. **HNSW vectord scales to 500K vectors with default params.** + 4.5 GB resident, 1-3ms search latency. Persistence (G1P) wasn't + used because the encoded file would exceed storaged's PUT cap; + that's a real G3+ cleanup but not a G1P bug. +3. **The Provider-interface design held.** Swapping Ollama for + OpenAI/Voyage in G3 would change exactly one file in + `internal/embed/`; the rest of the stack is unaffected. + +### Gaps surfaced + +- **Persistence at scale**: storaged's 256 MiB cap blocks single- + file `LHV1` blobs > ~150K vectors at d=768. Either bump the cap + for vector workloads, split the file across multiple keys, or + use multipart uploads in storaged. Defer to G3+ — the test ran + with persistence disabled. +- **vectord Add is single-threaded via RWMutex**. With Ollama's + GPU only at 65% and our embed concurrency at 8, the Add lock is + the throughput cap. SetMaxOpenConns(1)-style serialization made + sense for G1 (avoid HNSW concurrency hazards) but the staffing + scale test shows the cost: ~30-40 minutes per 500K dataset. + Concurrent Adds would be a 2-3x speedup if the library can + handle them; library has known thread-safety quirks (G1 scrum + documented two), so this needs careful audit before lifting. +- **No caching of recent embed results**. Five queries each cost a + fresh ~50ms Ollama call. Common-query cache is post-G2. diff --git a/scripts/staffing_500k/main.go b/scripts/staffing_500k/main.go new file mode 100644 index 0000000..c06ecf5 --- /dev/null +++ b/scripts/staffing_500k/main.go @@ -0,0 +1,338 @@ +// Staffing co-pilot scale test driver. +// +// Pipeline: workers_500k.csv → /v1/embed (batched, parallel) → +// /v1/vectors/index/workers_500k/add (batched). Then runs a handful +// of semantic queries against the populated index and prints the +// top hits — the human-readable check that "find workers like X" +// actually returns relevant workers. +// +// Designed to be re-run; index gets DELETEd at the start so leftover +// state from prior runs doesn't bias recall. +package main + +import ( + "bytes" + "context" + "encoding/csv" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "sync" + "sync/atomic" + "time" +) + +const ( + indexName = "workers_500k" + dim = 768 + + embedConcurrency = 8 // matches Ollama-on-A4000 sweet spot + embedBatchSize = 16 // texts per /v1/embed call + addBatchSize = 1000 // items per /v1/vectors/index/add call + + maxColPhone = 4 + maxColCity = 5 + maxColState = 6 + maxColRole = 2 + maxColSkills = 8 + maxColCerts = 9 + maxColResume = 17 + colWorkerID = 0 + colName = 1 +) + +func main() { + var ( + gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL") + csvPath = flag.String("csv", "/tmp/rs/workers_500k.csv", "path to workers CSV") + limit = flag.Int("limit", 0, "limit rows (0 = all)") + queries = flag.String("queries", "default", "default | ") + skipPop = flag.Bool("skip-populate", false, "skip embed+add, only run queries") + ) + flag.Parse() + + hc := &http.Client{Timeout: 5 * time.Minute} + + if !*skipPop { + // Tear down any prior index so recall is on a fresh build. + fmt.Printf("[sc] DELETE %s/v1/vectors/index/%s (idempotent cleanup)\n", *gateway, indexName) + _ = httpDelete(hc, *gateway+"/v1/vectors/index/"+indexName) + + // Create the index. + body := map[string]any{"name": indexName, "dimension": dim, "distance": "cosine"} + if code, msg := httpPostJSON(hc, *gateway+"/v1/vectors/index", body); code != 201 { + log.Fatalf("create index: %d %s", code, msg) + } + fmt.Println("[sc] created index workers_500k dim=768 cosine") + + t0 := time.Now() + if err := populate(hc, *gateway, *csvPath, *limit); err != nil { + log.Fatal(err) + } + fmt.Printf("[sc] populate complete in %v\n", time.Since(t0)) + } + + // Validate semantic queries. + qs := defaultQueries() + if *queries != "default" { + qs = strings.Split(*queries, ";") + } + for _, q := range qs { + runQuery(hc, *gateway, q) + } +} + +func defaultQueries() []string { + return []string{ + "CNC operator with first article and gauge R&R experience", + "forklift driver OSHA-30 certified warehouse", + "warehouse picker night shift bilingual", + "dental hygienist three years experience", + "electrician with industrial wiring background", + } +} + +func populate(hc *http.Client, gateway, csvPath string, limit int) error { + f, err := os.Open(csvPath) + if err != nil { + return fmt.Errorf("open csv: %w", err) + } + defer f.Close() + cr := csv.NewReader(f) + cr.FieldsPerRecord = -1 + if _, err := cr.Read(); err != nil { // header + return fmt.Errorf("read header: %w", err) + } + + type job struct { + ids []string + texts []string + metas []json.RawMessage + } + + jobs := make(chan job, embedConcurrency*2) + var wg sync.WaitGroup + var ( + totalEmbedded int64 + totalAdded int64 + ) + + for i := 0; i < embedConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := range jobs { + vecs, err := embedBatch(hc, gateway, j.texts) + if err != nil { + log.Printf("embed batch (%d items): %v", len(j.texts), err) + continue + } + atomic.AddInt64(&totalEmbedded, int64(len(vecs))) + if err := addBatch(hc, gateway, j.ids, vecs, j.metas); err != nil { + log.Printf("add batch (%d items): %v", len(j.ids), err) + continue + } + atomic.AddInt64(&totalAdded, int64(len(j.ids))) + } + }() + } + + progressTicker := time.NewTicker(10 * time.Second) + go func() { + for range progressTicker.C { + fmt.Printf("[sc] progress: embedded=%d added=%d\n", + atomic.LoadInt64(&totalEmbedded), atomic.LoadInt64(&totalAdded)) + } + }() + defer progressTicker.Stop() + + curIDs := make([]string, 0, embedBatchSize) + curTexts := make([]string, 0, embedBatchSize) + curMetas := make([]json.RawMessage, 0, embedBatchSize) + rows := 0 + for { + row, err := cr.Read() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("csv read row %d: %w", rows, err) + } + if len(row) <= maxColResume { + continue + } + id := strings.TrimSpace(row[colWorkerID]) + text := buildSearchText(row) + meta, _ := json.Marshal(map[string]any{ + "name": row[colName], + "role": row[maxColRole], + "city": row[maxColCity], + "state": row[maxColState], + }) + curIDs = append(curIDs, "w-"+id) + curTexts = append(curTexts, text) + curMetas = append(curMetas, meta) + + if len(curIDs) >= embedBatchSize { + jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas} + curIDs = make([]string, 0, embedBatchSize) + curTexts = make([]string, 0, embedBatchSize) + curMetas = make([]json.RawMessage, 0, embedBatchSize) + } + rows++ + if limit > 0 && rows >= limit { + break + } + } + if len(curIDs) > 0 { + jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas} + } + close(jobs) + wg.Wait() + + fmt.Printf("[sc] final: scanned=%d embedded=%d added=%d\n", + rows, atomic.LoadInt64(&totalEmbedded), atomic.LoadInt64(&totalAdded)) + return nil +} + +// buildSearchText concatenates the staffing-relevant columns into +// the text that gets embedded. Order: role first (most semantically +// dense), then skills + certs, city/state, finally the prose +// resume_text. Embedding models weight earlier tokens slightly more. +func buildSearchText(row []string) string { + var b strings.Builder + b.WriteString(row[maxColRole]) + b.WriteString(" in ") + b.WriteString(row[maxColCity]) + b.WriteString(", ") + b.WriteString(row[maxColState]) + b.WriteString(". Skills: ") + b.WriteString(row[maxColSkills]) + b.WriteString(". Certifications: ") + b.WriteString(row[maxColCerts]) + b.WriteString(". ") + b.WriteString(row[maxColResume]) + return b.String() +} + +func embedBatch(hc *http.Client, gateway string, texts []string) ([][]float32, error) { + body := map[string]any{"texts": texts} + bs, _ := json.Marshal(body) + req, _ := http.NewRequest(http.MethodPost, gateway+"/v1/embed", bytes.NewReader(bs)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return nil, fmt.Errorf("embed status %d: %s", resp.StatusCode, string(preview)) + } + var er struct { + Vectors [][]float32 `json:"vectors"` + } + if err := json.NewDecoder(resp.Body).Decode(&er); err != nil { + return nil, err + } + return er.Vectors, nil +} + +type addItem struct { + ID string `json:"id"` + Vector []float32 `json:"vector"` + Metadata json.RawMessage `json:"metadata"` +} + +func addBatch(hc *http.Client, gateway string, ids []string, vecs [][]float32, metas []json.RawMessage) error { + items := make([]addItem, len(ids)) + for i := range ids { + items[i] = addItem{ID: ids[i], Vector: vecs[i], Metadata: metas[i]} + } + bs, _ := json.Marshal(map[string]any{"items": items}) + req, _ := http.NewRequest(http.MethodPost, + gateway+"/v1/vectors/index/"+indexName+"/add", bytes.NewReader(bs)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return fmt.Errorf("add status %d: %s", resp.StatusCode, string(preview)) + } + return nil +} + +func runQuery(hc *http.Client, gateway, q string) { + t0 := time.Now() + // 1. Embed the query. + vecs, err := embedBatch(hc, gateway, []string{q}) + if err != nil || len(vecs) == 0 { + fmt.Printf("[sc] query %q: embed err: %v\n", q, err) + return + } + embedDur := time.Since(t0) + t1 := time.Now() + // 2. Search. + body := map[string]any{"vector": vecs[0], "k": 5} + bs, _ := json.Marshal(body) + req, _ := http.NewRequest(http.MethodPost, + gateway+"/v1/vectors/index/"+indexName+"/search", bytes.NewReader(bs)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + fmt.Printf("[sc] query %q: search err: %v\n", q, err) + return + } + defer resp.Body.Close() + searchDur := time.Since(t1) + var sr struct { + Results []struct { + ID string `json:"id"` + Distance float32 `json:"distance"` + Metadata json.RawMessage `json:"metadata"` + } `json:"results"` + } + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { + fmt.Printf("[sc] query %q: decode err: %v\n", q, err) + return + } + fmt.Printf("\n[sc] %q (embed=%v search=%v)\n", q, embedDur.Round(time.Millisecond), searchDur.Round(time.Millisecond)) + for i, r := range sr.Results { + fmt.Printf(" %d. %s d=%.4f %s\n", i+1, r.ID, r.Distance, string(r.Metadata)) + } +} + +func httpPostJSON(hc *http.Client, url string, body any) (int, string) { + bs, _ := json.Marshal(body) + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(bs)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return 0, err.Error() + } + defer resp.Body.Close() + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return resp.StatusCode, string(preview) +} + +func httpDelete(hc *http.Client, url string) error { + req, _ := http.NewRequest(http.MethodDelete, url, nil) + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + return nil +} + +// keep context.Background reachable in case future paths use it +var _ = context.Background