Workers driver embed text reverted to V0 after testing 3 variants
on the "Forklift operator with OSHA-30 certification, warehouse
experience" reality-test query against 5000 workers (which contains
569 actual Forklift Operators per the 31b4088 probe).
V0 (current, restored): "Worker role: <role>. Skills: ...
Certifications: ... <resume_text>"
→ 6 workers in top-8, 0 Forklift Ops,
top distance 0.327, top role
"Production Worker"
V4a (role-doubled): "<role>. <role> with <skills>. ..."
drop archetype + resume_text
→ 6 workers in top-8, 0 Forklift Ops,
top distance 0.254, top role
"Production Worker"
V4b (resume-only): just the resume_text natural-language
sentence, no structured prefix
→ 4 workers in top-8 (WORSE mix —
software-engineer candidates filled
the displaced slots), 0 Forklift Ops,
top distance 0.379
Conclusion: all three variants surface Production Workers / Machine
Operators / Line Leads ABOVE Forklift Operators for this query.
The 569 actual Forklift Operators in the 5000-row sample don't
appear in any top-8. Embed-text design isn't the bottleneck —
nomic-embed-text 137M's geometry doesn't separate "Forklift
Operator" from "Production Worker" / "Machine Operator" / "Line
Lead" in this query's neighborhood.
Real fixes belong elsewhere:
- Hybrid SQL+semantic (B): pre-filter by role/certs via queryd
before semantic ranking. Addresses the gap directly.
- Different embedding model: mxbai-embed-large or a staffing-
fine-tuned model. Costs an Ollama model swap + re-embedding.
- Playbook boost (component 5, already shipped): record
successful Forklift placements; future queries surface those
workers via similarity. Compounds with use.
V0 restored because it has the best worker/candidate mix in top-8
(6 vs 4 in V4b), preserving the multi-corpus reality-test signal
quality even if the role match is imperfect. Comments updated to
record the experiment so future sessions don't relitigate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
309 lines
8.4 KiB
Go
309 lines
8.4 KiB
Go
// Staffing workers corpus driver — second-of-two corpora that proves
|
||
// the multi-corpus matrix indexer end-to-end. Mirrors the candidates
|
||
// driver's parquet pattern but handles multi-chunk arrow tables
|
||
// (workers_500k.parquet has multiple row groups, candidates fits in
|
||
// one).
|
||
//
|
||
// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet
|
||
// (500000 rows, 18 cols including role + skills + certifications +
|
||
// archetype + reliability scores + resume_text).
|
||
//
|
||
// IDs prefixed "w-" so multi-corpus matrix queries returning workers
|
||
// alongside candidates ("c-") stay unambiguous in merged results.
|
||
//
|
||
// Default -limit 5000 because the goal of this driver is multi-corpus
|
||
// reality testing, not the 500K stress test (separate concern, see
|
||
// project_golang_lakehouse.md scale framing).
|
||
|
||
package main
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/apache/arrow-go/v18/arrow"
|
||
"github.com/apache/arrow-go/v18/arrow/array"
|
||
"github.com/apache/arrow-go/v18/arrow/memory"
|
||
"github.com/apache/arrow-go/v18/parquet/file"
|
||
"github.com/apache/arrow-go/v18/parquet/pqarrow"
|
||
|
||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest"
|
||
)
|
||
|
||
const (
|
||
indexName = "workers"
|
||
dim = 768
|
||
)
|
||
|
||
// workersSource implements corpusingest.Source over an in-memory
|
||
// arrow.Table loaded from workers_500k.parquet. Unlike the candidates
|
||
// driver, this MUST handle multi-chunk arrow columns — a 500K-row
|
||
// parquet has ≥1 row group, each becoming its own chunk after read.
|
||
type workersSource struct {
|
||
cols struct {
|
||
workerID *chunkedInt64
|
||
name, role, city, state, skills, certs, archetype, resume, comm *chunkedString
|
||
}
|
||
n int64
|
||
cur int64
|
||
}
|
||
|
||
// chunkedString lets per-row access work whether the table came back
|
||
// with one chunk or many. Forward-only iteration; not safe to seek.
|
||
type chunkedString struct {
|
||
chunks []*array.String
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedString(col *arrow.Chunked) (*chunkedString, error) {
|
||
cs := &chunkedString{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.String)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch)
|
||
}
|
||
cs.chunks = append(cs.chunks, s)
|
||
cs.sizes = append(cs.sizes, int64(s.Len()))
|
||
}
|
||
return cs, nil
|
||
}
|
||
|
||
// At returns the value at the global row index. O(chunks) per call;
|
||
// fine for our scale (≤5000 rows × ~5 chunks).
|
||
func (c *chunkedString) At(row int64) string {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type chunkedInt64 struct {
|
||
chunks []*array.Int64
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) {
|
||
ci := &chunkedInt64{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.Int64)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch)
|
||
}
|
||
ci.chunks = append(ci.chunks, s)
|
||
ci.sizes = append(ci.sizes, int64(s.Len()))
|
||
}
|
||
return ci, nil
|
||
}
|
||
|
||
func (c *chunkedInt64) At(row int64) int64 {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func newWorkersSource(path string) (*workersSource, func(), error) {
|
||
f, err := os.Open(path)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("open parquet: %w", err)
|
||
}
|
||
pf, err := file.NewParquetReader(f)
|
||
if err != nil {
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("parquet reader: %w", err)
|
||
}
|
||
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("arrow reader: %w", err)
|
||
}
|
||
table, err := fr.ReadTable(context.Background())
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("read table: %w", err)
|
||
}
|
||
|
||
src := &workersSource{n: table.NumRows()}
|
||
schema := table.Schema()
|
||
|
||
stringCol := func(name string) (*chunkedString, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedString(table.Column(idx[0]).Data())
|
||
}
|
||
int64Col := func(name string) (*chunkedInt64, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedInt64(table.Column(idx[0]).Data())
|
||
}
|
||
|
||
cleanup := func() {
|
||
table.Release()
|
||
pf.Close()
|
||
f.Close()
|
||
}
|
||
|
||
wid, err := int64Col("worker_id")
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
src.cols.workerID = wid
|
||
|
||
for _, t := range []struct {
|
||
name string
|
||
dst **chunkedString
|
||
}{
|
||
{"name", &src.cols.name},
|
||
{"role", &src.cols.role},
|
||
{"city", &src.cols.city},
|
||
{"state", &src.cols.state},
|
||
{"skills", &src.cols.skills},
|
||
{"certifications", &src.cols.certs},
|
||
{"archetype", &src.cols.archetype},
|
||
{"resume_text", &src.cols.resume},
|
||
{"communications", &src.cols.comm},
|
||
} {
|
||
col, err := stringCol(t.name)
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
*t.dst = col
|
||
}
|
||
return src, cleanup, nil
|
||
}
|
||
|
||
func (s *workersSource) Next() (corpusingest.Row, error) {
|
||
if s.cur >= s.n {
|
||
return corpusingest.Row{}, io.EOF
|
||
}
|
||
i := s.cur
|
||
s.cur++
|
||
|
||
workerID := s.cols.workerID.At(i)
|
||
name := s.cols.name.At(i)
|
||
role := s.cols.role.At(i)
|
||
city := s.cols.city.At(i)
|
||
state := s.cols.state.At(i)
|
||
skills := s.cols.skills.At(i)
|
||
certs := s.cols.certs.At(i)
|
||
archetype := s.cols.archetype.At(i)
|
||
resume := s.cols.resume.At(i)
|
||
|
||
// Embed text — restored to V0 after 2026-04-29 D experiment.
|
||
// Three variants tested on a query of "Forklift operator with
|
||
// OSHA-30 certification, warehouse experience":
|
||
// V0 (this): structured "Worker role: ... Skills: ... <resume_text>"
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.327
|
||
// V4a (drop): drop labels + resume + archetype, double the role
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.254
|
||
// V4b (resume only): just resume_text, no structured prefix
|
||
// → 4 workers in top-8 (worse mix), 0 Forklift, top 0.379
|
||
// All three surfaced Production Workers / Machine Operators /
|
||
// Line Leads above actual Forklift Operators. Conclusion: the
|
||
// bottleneck is nomic-embed-text 137M's geometry, not text
|
||
// design. Real fixes belong elsewhere — hybrid SQL+semantic
|
||
// (B in next-step menu) or playbook boost (component 5,
|
||
// already shipped). V0 keeps the best worker/candidate mix.
|
||
var b strings.Builder
|
||
b.WriteString("Worker role: ")
|
||
b.WriteString(role)
|
||
b.WriteString(". Skills: ")
|
||
b.WriteString(skills)
|
||
b.WriteString(". Certifications: ")
|
||
b.WriteString(certs)
|
||
b.WriteString(". Based in ")
|
||
b.WriteString(city)
|
||
b.WriteString(", ")
|
||
b.WriteString(state)
|
||
b.WriteString(". Archetype: ")
|
||
b.WriteString(archetype)
|
||
b.WriteString(". ")
|
||
b.WriteString(resume)
|
||
text := b.String()
|
||
|
||
return corpusingest.Row{
|
||
ID: fmt.Sprintf("w-%d", workerID),
|
||
Text: text,
|
||
Metadata: map[string]any{
|
||
"worker_id": workerID,
|
||
"name": name,
|
||
"role": role,
|
||
"city": city,
|
||
"state": state,
|
||
"skills": skills,
|
||
"certifications": certs,
|
||
"archetype": archetype,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
func main() {
|
||
var (
|
||
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
|
||
parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet")
|
||
limit = flag.Int("limit", 5000, "limit rows (0 = all 500K — usually not what you want here)")
|
||
drop = flag.Bool("drop", true, "DELETE workers index before populate")
|
||
)
|
||
flag.Parse()
|
||
|
||
hc := &http.Client{Timeout: 5 * time.Minute}
|
||
ctx := context.Background()
|
||
|
||
src, cleanup, err := newWorkersSource(*parquetPath)
|
||
if err != nil {
|
||
log.Fatalf("open workers source: %v", err)
|
||
}
|
||
defer cleanup()
|
||
|
||
stats, err := corpusingest.Run(ctx, corpusingest.Config{
|
||
GatewayURL: *gateway,
|
||
IndexName: indexName,
|
||
Dimension: dim,
|
||
Distance: "cosine",
|
||
EmbedBatch: 16,
|
||
EmbedWorkers: 8,
|
||
AddBatch: 500,
|
||
Limit: *limit,
|
||
DropExisting: *drop,
|
||
HTTPClient: hc,
|
||
LogProgress: 10 * time.Second,
|
||
}, src)
|
||
if err != nil {
|
||
if errors.Is(err, corpusingest.ErrPartialFailure) {
|
||
fmt.Printf("[workers] WARN partial failure: %v\n", err)
|
||
} else {
|
||
log.Fatalf("ingest: %v", err)
|
||
}
|
||
}
|
||
fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n",
|
||
stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches,
|
||
stats.Wall.Round(time.Millisecond))
|
||
}
|
||
|