root 1f700e731d Staffing scale test: full 500K through gateway → embedd → vectord pipeline
scripts/staffing_500k/main.go: driver that reads workers_500k.csv,
embeds combined-text per worker via /v1/embed, adds to vectord index
"workers_500k", runs canonical staffing queries against the populated
index. Reproducible end-to-end test of the staffing co-pilot pipeline
at production scale.

Run results (2026-04-29 ~02:30):
  500,000 vectors ingested in 35m 36s (~234/sec avg)
  vectord peak RSS 4.5 GB (~9 KB/vector incl. HNSW graph)
  Query latency: embed 40-59ms + search 1-3ms = ~50ms end-to-end
  GPU avg ~65% (Ollama not the bottleneck — vectord Add is)

Semantic recall on canonical queries:
  "electrician with industrial wiring": top 2 are literal Electricians (d=0.30)
  "CNC operator with first article": Assembler / Quality Techs (adjacent, d=0.24)
  "forklift driver OSHA-30": warehouse roles (d=0.33)
  "warehouse picker night shift bilingual": Material Handlers (d=0.31)
  "dental hygienist": Production Workers at d=0.49+ — correctly
    LOW-similarity, signals "no dental hygienists in this manufacturing
    dataset" rather than hallucinating a fake match.

Documented gaps:
  - storaged's 256 MiB PUT cap blocks single-file LHV1 persistence
    above ~150K vectors at d=768. Test ran with persistence disabled.
  - vectord Add is RWMutex-serialized — with GPU at 65% util this is
    the throughput cap. Concurrent Adds would be 2-3x faster but
    require careful audit of coder/hnsw thread-safety (G1 scrum
    documented two known quirks).

PHASE_G0_KICKOFF.md gains a "Staffing scale test" section with full
metrics + the gaps-surfaced list. The architectural payoff is real:
six binaries, one HTTP route, ~50ms from text query to top-K
semantically-relevant workers across 500K records.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 02:31:30 -05:00

339 lines
9.4 KiB
Go

// Staffing co-pilot scale test driver.
//
// Pipeline: workers_500k.csv → /v1/embed (batched, parallel) →
// /v1/vectors/index/workers_500k/add (batched). Then runs a handful
// of semantic queries against the populated index and prints the
// top hits — the human-readable check that "find workers like X"
// actually returns relevant workers.
//
// Designed to be re-run; index gets DELETEd at the start so leftover
// state from prior runs doesn't bias recall.
package main
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
indexName = "workers_500k"
dim = 768
embedConcurrency = 8 // matches Ollama-on-A4000 sweet spot
embedBatchSize = 16 // texts per /v1/embed call
addBatchSize = 1000 // items per /v1/vectors/index/add call
maxColPhone = 4
maxColCity = 5
maxColState = 6
maxColRole = 2
maxColSkills = 8
maxColCerts = 9
maxColResume = 17
colWorkerID = 0
colName = 1
)
func main() {
var (
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
csvPath = flag.String("csv", "/tmp/rs/workers_500k.csv", "path to workers CSV")
limit = flag.Int("limit", 0, "limit rows (0 = all)")
queries = flag.String("queries", "default", "default | <semicolon-separated query strings>")
skipPop = flag.Bool("skip-populate", false, "skip embed+add, only run queries")
)
flag.Parse()
hc := &http.Client{Timeout: 5 * time.Minute}
if !*skipPop {
// Tear down any prior index so recall is on a fresh build.
fmt.Printf("[sc] DELETE %s/v1/vectors/index/%s (idempotent cleanup)\n", *gateway, indexName)
_ = httpDelete(hc, *gateway+"/v1/vectors/index/"+indexName)
// Create the index.
body := map[string]any{"name": indexName, "dimension": dim, "distance": "cosine"}
if code, msg := httpPostJSON(hc, *gateway+"/v1/vectors/index", body); code != 201 {
log.Fatalf("create index: %d %s", code, msg)
}
fmt.Println("[sc] created index workers_500k dim=768 cosine")
t0 := time.Now()
if err := populate(hc, *gateway, *csvPath, *limit); err != nil {
log.Fatal(err)
}
fmt.Printf("[sc] populate complete in %v\n", time.Since(t0))
}
// Validate semantic queries.
qs := defaultQueries()
if *queries != "default" {
qs = strings.Split(*queries, ";")
}
for _, q := range qs {
runQuery(hc, *gateway, q)
}
}
func defaultQueries() []string {
return []string{
"CNC operator with first article and gauge R&R experience",
"forklift driver OSHA-30 certified warehouse",
"warehouse picker night shift bilingual",
"dental hygienist three years experience",
"electrician with industrial wiring background",
}
}
func populate(hc *http.Client, gateway, csvPath string, limit int) error {
f, err := os.Open(csvPath)
if err != nil {
return fmt.Errorf("open csv: %w", err)
}
defer f.Close()
cr := csv.NewReader(f)
cr.FieldsPerRecord = -1
if _, err := cr.Read(); err != nil { // header
return fmt.Errorf("read header: %w", err)
}
type job struct {
ids []string
texts []string
metas []json.RawMessage
}
jobs := make(chan job, embedConcurrency*2)
var wg sync.WaitGroup
var (
totalEmbedded int64
totalAdded int64
)
for i := 0; i < embedConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range jobs {
vecs, err := embedBatch(hc, gateway, j.texts)
if err != nil {
log.Printf("embed batch (%d items): %v", len(j.texts), err)
continue
}
atomic.AddInt64(&totalEmbedded, int64(len(vecs)))
if err := addBatch(hc, gateway, j.ids, vecs, j.metas); err != nil {
log.Printf("add batch (%d items): %v", len(j.ids), err)
continue
}
atomic.AddInt64(&totalAdded, int64(len(j.ids)))
}
}()
}
progressTicker := time.NewTicker(10 * time.Second)
go func() {
for range progressTicker.C {
fmt.Printf("[sc] progress: embedded=%d added=%d\n",
atomic.LoadInt64(&totalEmbedded), atomic.LoadInt64(&totalAdded))
}
}()
defer progressTicker.Stop()
curIDs := make([]string, 0, embedBatchSize)
curTexts := make([]string, 0, embedBatchSize)
curMetas := make([]json.RawMessage, 0, embedBatchSize)
rows := 0
for {
row, err := cr.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("csv read row %d: %w", rows, err)
}
if len(row) <= maxColResume {
continue
}
id := strings.TrimSpace(row[colWorkerID])
text := buildSearchText(row)
meta, _ := json.Marshal(map[string]any{
"name": row[colName],
"role": row[maxColRole],
"city": row[maxColCity],
"state": row[maxColState],
})
curIDs = append(curIDs, "w-"+id)
curTexts = append(curTexts, text)
curMetas = append(curMetas, meta)
if len(curIDs) >= embedBatchSize {
jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas}
curIDs = make([]string, 0, embedBatchSize)
curTexts = make([]string, 0, embedBatchSize)
curMetas = make([]json.RawMessage, 0, embedBatchSize)
}
rows++
if limit > 0 && rows >= limit {
break
}
}
if len(curIDs) > 0 {
jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas}
}
close(jobs)
wg.Wait()
fmt.Printf("[sc] final: scanned=%d embedded=%d added=%d\n",
rows, atomic.LoadInt64(&totalEmbedded), atomic.LoadInt64(&totalAdded))
return nil
}
// buildSearchText concatenates the staffing-relevant columns into
// the text that gets embedded. Order: role first (most semantically
// dense), then skills + certs, city/state, finally the prose
// resume_text. Embedding models weight earlier tokens slightly more.
func buildSearchText(row []string) string {
var b strings.Builder
b.WriteString(row[maxColRole])
b.WriteString(" in ")
b.WriteString(row[maxColCity])
b.WriteString(", ")
b.WriteString(row[maxColState])
b.WriteString(". Skills: ")
b.WriteString(row[maxColSkills])
b.WriteString(". Certifications: ")
b.WriteString(row[maxColCerts])
b.WriteString(". ")
b.WriteString(row[maxColResume])
return b.String()
}
func embedBatch(hc *http.Client, gateway string, texts []string) ([][]float32, error) {
body := map[string]any{"texts": texts}
bs, _ := json.Marshal(body)
req, _ := http.NewRequest(http.MethodPost, gateway+"/v1/embed", bytes.NewReader(bs))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("embed status %d: %s", resp.StatusCode, string(preview))
}
var er struct {
Vectors [][]float32 `json:"vectors"`
}
if err := json.NewDecoder(resp.Body).Decode(&er); err != nil {
return nil, err
}
return er.Vectors, nil
}
type addItem struct {
ID string `json:"id"`
Vector []float32 `json:"vector"`
Metadata json.RawMessage `json:"metadata"`
}
func addBatch(hc *http.Client, gateway string, ids []string, vecs [][]float32, metas []json.RawMessage) error {
items := make([]addItem, len(ids))
for i := range ids {
items[i] = addItem{ID: ids[i], Vector: vecs[i], Metadata: metas[i]}
}
bs, _ := json.Marshal(map[string]any{"items": items})
req, _ := http.NewRequest(http.MethodPost,
gateway+"/v1/vectors/index/"+indexName+"/add", bytes.NewReader(bs))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return fmt.Errorf("add status %d: %s", resp.StatusCode, string(preview))
}
return nil
}
func runQuery(hc *http.Client, gateway, q string) {
t0 := time.Now()
// 1. Embed the query.
vecs, err := embedBatch(hc, gateway, []string{q})
if err != nil || len(vecs) == 0 {
fmt.Printf("[sc] query %q: embed err: %v\n", q, err)
return
}
embedDur := time.Since(t0)
t1 := time.Now()
// 2. Search.
body := map[string]any{"vector": vecs[0], "k": 5}
bs, _ := json.Marshal(body)
req, _ := http.NewRequest(http.MethodPost,
gateway+"/v1/vectors/index/"+indexName+"/search", bytes.NewReader(bs))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
fmt.Printf("[sc] query %q: search err: %v\n", q, err)
return
}
defer resp.Body.Close()
searchDur := time.Since(t1)
var sr struct {
Results []struct {
ID string `json:"id"`
Distance float32 `json:"distance"`
Metadata json.RawMessage `json:"metadata"`
} `json:"results"`
}
if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil {
fmt.Printf("[sc] query %q: decode err: %v\n", q, err)
return
}
fmt.Printf("\n[sc] %q (embed=%v search=%v)\n", q, embedDur.Round(time.Millisecond), searchDur.Round(time.Millisecond))
for i, r := range sr.Results {
fmt.Printf(" %d. %s d=%.4f %s\n", i+1, r.ID, r.Distance, string(r.Metadata))
}
}
func httpPostJSON(hc *http.Client, url string, body any) (int, string) {
bs, _ := json.Marshal(body)
req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(bs))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return 0, err.Error()
}
defer resp.Body.Close()
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return resp.StatusCode, string(preview)
}
func httpDelete(hc *http.Client, url string) error {
req, _ := http.NewRequest(http.MethodDelete, url, nil)
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
return nil
}
// keep context.Background reachable in case future paths use it
var _ = context.Background