diff --git a/scripts/multi_corpus_e2e.sh b/scripts/multi_corpus_e2e.sh new file mode 100755 index 0000000..5cbe2ef --- /dev/null +++ b/scripts/multi_corpus_e2e.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +# Multi-corpus reality test — first deep-field test with TWO real +# staffing corpora composed via /v1/matrix/search. +# +# Pipeline: +# - Bring up the Go stack (storaged, embedd, vectord, matrixd, gateway) +# - Ingest workers (5000 rows from workers_500k.parquet) +# - Ingest candidates (1000 rows from candidates.parquet) +# - Run a real query through /v1/matrix/search with both corpora +# - Print the merged top-k with corpus attribution +# +# Headline assertion: results include hits from BOTH corpora (the +# whole point of multi-corpus matrix retrieval). +# +# Requires: Ollama on :11434 with nomic-embed-text loaded. Skips +# (exit 0) when Ollama is absent. +# +# Usage: ./scripts/multi_corpus_e2e.sh +# ./scripts/multi_corpus_e2e.sh "your custom query" + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +QUERY="${1:-Forklift operator with OSHA-30 certification, warehouse experience}" + +if ! curl -sS --max-time 3 http://localhost:11434/api/tags >/dev/null 2>&1; then + echo "[multi-corpus-e2e] Ollama not reachable on :11434 — skipping" + exit 0 +fi + +echo "[multi-corpus-e2e] building binaries..." +go build -o bin/ ./cmd/storaged ./cmd/embedd ./cmd/vectord ./cmd/matrixd ./cmd/gateway \ + ./scripts/staffing_workers ./scripts/staffing_candidates + +pkill -f "bin/(storaged|embedd|vectord|matrixd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/e2e.toml" + +cleanup() { + echo "[multi-corpus-e2e] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Ephemeral mode (vectord storaged_url=""); same rationale as +# candidates_e2e — don't pollute MinIO _vectors/ between runs. +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[multi-corpus-e2e] launching stack..." +./bin/storaged -config "$CFG" > /tmp/storaged.log 2>&1 & PIDS+=($!) +poll_health 3211 || { echo "storaged failed"; exit 1; } +./bin/embedd -config "$CFG" > /tmp/embedd.log 2>&1 & PIDS+=($!) +poll_health 3216 || { echo "embedd failed"; exit 1; } +./bin/vectord -config "$CFG" > /tmp/vectord.log 2>&1 & PIDS+=($!) +poll_health 3215 || { echo "vectord failed"; exit 1; } +./bin/matrixd -config "$CFG" > /tmp/matrixd.log 2>&1 & PIDS+=($!) +poll_health 3218 || { echo "matrixd failed"; exit 1; } +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; exit 1; } + +echo +echo "[multi-corpus-e2e] ingest workers (limit=5000)..." +./bin/staffing_workers -limit 5000 + +echo +echo "[multi-corpus-e2e] ingest candidates..." +./bin/staffing_candidates -skip-populate=false -query "$QUERY" 2>&1 | grep -v "^\[candidates\]\(matrix\|reality\)" || true + +echo +echo "[multi-corpus-e2e] /matrix/corpora — confirm both registered:" +curl -sS http://127.0.0.1:3110/v1/matrix/corpora | jq -c + +echo +echo "[multi-corpus-e2e] multi-corpus query: $QUERY" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/matrix/search \ + -H 'Content-Type: application/json' \ + -d "{\"query_text\":\"$QUERY\",\"corpora\":[\"workers\",\"candidates\"],\"k\":8,\"per_corpus_k\":6}")" + +# Sanity / headline assertions +WORKER_HITS="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="workers")] | length')" +CAND_HITS="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="candidates")] | length')" +TOTAL="$(echo "$RESP" | jq -r '.results | length')" + +echo +echo "[multi-corpus-e2e] merged top-$TOTAL: workers=$WORKER_HITS candidates=$CAND_HITS" +echo "$RESP" | jq -r '.results[] | " \(.corpus | .[0:1]) d=\(.distance | tostring | .[0:6]) \(.id) \(.metadata.role // .metadata.skills // "n/a")"' + +if [ "$WORKER_HITS" -gt 0 ] && [ "$CAND_HITS" -gt 0 ]; then + echo + echo "[multi-corpus-e2e] PASS: both corpora represented in merged top-$TOTAL" + exit 0 +else + echo + echo "[multi-corpus-e2e] FAIL: corpus mix was workers=$WORKER_HITS candidates=$CAND_HITS" + exit 1 +fi diff --git a/scripts/staffing_workers/main.go b/scripts/staffing_workers/main.go new file mode 100644 index 0000000..e0758a5 --- /dev/null +++ b/scripts/staffing_workers/main.go @@ -0,0 +1,290 @@ +// Staffing workers corpus driver — second-of-two corpora that proves +// the multi-corpus matrix indexer end-to-end. Mirrors the candidates +// driver's parquet pattern but handles multi-chunk arrow tables +// (workers_500k.parquet has multiple row groups, candidates fits in +// one). +// +// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet +// (500000 rows, 18 cols including role + skills + certifications + +// archetype + reliability scores + resume_text). +// +// IDs prefixed "w-" so multi-corpus matrix queries returning workers +// alongside candidates ("c-") stay unambiguous in merged results. +// +// Default -limit 5000 because the goal of this driver is multi-corpus +// reality testing, not the 500K stress test (separate concern, see +// project_golang_lakehouse.md scale framing). + +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest" +) + +const ( + indexName = "workers" + dim = 768 +) + +// workersSource implements corpusingest.Source over an in-memory +// arrow.Table loaded from workers_500k.parquet. Unlike the candidates +// driver, this MUST handle multi-chunk arrow columns — a 500K-row +// parquet has ≥1 row group, each becoming its own chunk after read. +type workersSource struct { + cols struct { + workerID *chunkedInt64 + name, role, city, state, skills, certs, archetype, resume, comm *chunkedString + } + n int64 + cur int64 +} + +// chunkedString lets per-row access work whether the table came back +// with one chunk or many. Forward-only iteration; not safe to seek. +type chunkedString struct { + chunks []*array.String + sizes []int64 +} + +func newChunkedString(col *arrow.Chunked) (*chunkedString, error) { + cs := &chunkedString{} + for i, ch := range col.Chunks() { + s, ok := ch.(*array.String) + if !ok { + return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch) + } + cs.chunks = append(cs.chunks, s) + cs.sizes = append(cs.sizes, int64(s.Len())) + } + return cs, nil +} + +// At returns the value at the global row index. O(chunks) per call; +// fine for our scale (≤5000 rows × ~5 chunks). +func (c *chunkedString) At(row int64) string { + var offset int64 + for i, s := range c.chunks { + n := c.sizes[i] + if row < offset+n { + return s.Value(int(row - offset)) + } + offset += n + } + return "" +} + +type chunkedInt64 struct { + chunks []*array.Int64 + sizes []int64 +} + +func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) { + ci := &chunkedInt64{} + for i, ch := range col.Chunks() { + s, ok := ch.(*array.Int64) + if !ok { + return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch) + } + ci.chunks = append(ci.chunks, s) + ci.sizes = append(ci.sizes, int64(s.Len())) + } + return ci, nil +} + +func (c *chunkedInt64) At(row int64) int64 { + var offset int64 + for i, s := range c.chunks { + n := c.sizes[i] + if row < offset+n { + return s.Value(int(row - offset)) + } + offset += n + } + return 0 +} + +func newWorkersSource(path string) (*workersSource, func(), error) { + f, err := os.Open(path) + if err != nil { + return nil, nil, fmt.Errorf("open parquet: %w", err) + } + pf, err := file.NewParquetReader(f) + if err != nil { + f.Close() + return nil, nil, fmt.Errorf("parquet reader: %w", err) + } + fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + if err != nil { + pf.Close() + f.Close() + return nil, nil, fmt.Errorf("arrow reader: %w", err) + } + table, err := fr.ReadTable(context.Background()) + if err != nil { + pf.Close() + f.Close() + return nil, nil, fmt.Errorf("read table: %w", err) + } + + src := &workersSource{n: table.NumRows()} + schema := table.Schema() + + stringCol := func(name string) (*chunkedString, error) { + idx := schema.FieldIndices(name) + if len(idx) == 0 { + return nil, fmt.Errorf("column %q not found", name) + } + return newChunkedString(table.Column(idx[0]).Data()) + } + int64Col := func(name string) (*chunkedInt64, error) { + idx := schema.FieldIndices(name) + if len(idx) == 0 { + return nil, fmt.Errorf("column %q not found", name) + } + return newChunkedInt64(table.Column(idx[0]).Data()) + } + + cleanup := func() { + table.Release() + pf.Close() + f.Close() + } + + wid, err := int64Col("worker_id") + if err != nil { + cleanup() + return nil, nil, err + } + src.cols.workerID = wid + + for _, t := range []struct { + name string + dst **chunkedString + }{ + {"name", &src.cols.name}, + {"role", &src.cols.role}, + {"city", &src.cols.city}, + {"state", &src.cols.state}, + {"skills", &src.cols.skills}, + {"certifications", &src.cols.certs}, + {"archetype", &src.cols.archetype}, + {"resume_text", &src.cols.resume}, + {"communications", &src.cols.comm}, + } { + col, err := stringCol(t.name) + if err != nil { + cleanup() + return nil, nil, err + } + *t.dst = col + } + return src, cleanup, nil +} + +func (s *workersSource) Next() (corpusingest.Row, error) { + if s.cur >= s.n { + return corpusingest.Row{}, io.EOF + } + i := s.cur + s.cur++ + + workerID := s.cols.workerID.At(i) + name := s.cols.name.At(i) + role := s.cols.role.At(i) + city := s.cols.city.At(i) + state := s.cols.state.At(i) + skills := s.cols.skills.At(i) + certs := s.cols.certs.At(i) + archetype := s.cols.archetype.At(i) + resume := s.cols.resume.At(i) + + // Embed text: role first (most semantically dense for staffing + // queries), then skills + certs, then location, archetype, finally + // the prose resume. Same ordering rationale as the candidates + // driver and the original 500K driver. + var b strings.Builder + b.WriteString("Worker role: ") + b.WriteString(role) + b.WriteString(". Skills: ") + b.WriteString(skills) + b.WriteString(". Certifications: ") + b.WriteString(certs) + b.WriteString(". Based in ") + b.WriteString(city) + b.WriteString(", ") + b.WriteString(state) + b.WriteString(". Archetype: ") + b.WriteString(archetype) + b.WriteString(". ") + b.WriteString(resume) + + return corpusingest.Row{ + ID: fmt.Sprintf("w-%d", workerID), + Text: b.String(), + Metadata: map[string]any{ + "worker_id": workerID, + "name": name, + "role": role, + "city": city, + "state": state, + "skills": skills, + "certifications": certs, + "archetype": archetype, + }, + }, nil +} + +func main() { + var ( + gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL") + parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet") + limit = flag.Int("limit", 5000, "limit rows (0 = all 500K — usually not what you want here)") + drop = flag.Bool("drop", true, "DELETE workers index before populate") + ) + flag.Parse() + + hc := &http.Client{Timeout: 5 * time.Minute} + ctx := context.Background() + + src, cleanup, err := newWorkersSource(*parquetPath) + if err != nil { + log.Fatalf("open workers source: %v", err) + } + defer cleanup() + + stats, err := corpusingest.Run(ctx, corpusingest.Config{ + GatewayURL: *gateway, + IndexName: indexName, + Dimension: dim, + Distance: "cosine", + EmbedBatch: 16, + EmbedWorkers: 8, + AddBatch: 500, + Limit: *limit, + DropExisting: *drop, + HTTPClient: hc, + LogProgress: 10 * time.Second, + }, src) + if err != nil { + log.Fatalf("ingest: %v", err) + } + fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d wall=%v\n", + stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) +} +