Cross-lineage scrum on b2e45f7 produced 1 convergent + 3 single-reviewer
findings worth fixing. All apply.
1. (Opus WARN + Qwen INFO convergent) scripts/playbook_lift.sh: replace
sleep 2.5 in SQL probe with active polling up to 5s. refresh_every=1s
is a lower bound; under load the manifest may not be visible in a
fixed sleep, which would 4xx the probe and abort the reality run.
2. (Opus INFO) scripts/playbook_lift.sh: report template glued
"env JUDGE_MODEL" + value as "env JUDGE_MODELqwen2.5:latest" with no
separator. Replaced two :+/:- substitution chains with a single
JUDGE_SOURCE variable computed once at the top of the harness.
3. (Opus INFO) scripts/staffing_workers/main.go: -id-prefix "" silently
allowed, defeating the flag's purpose (cross-corpus collision prevent).
Now log.Fatal at startup with explicit hint.
4. (Opus WARN) cmd/{pathwayd,observerd}/main_test.go: newTestRouter
returned http.Handler then re-cast to chi.Router for chi.Walk.
Returning chi.Router directly satisfies http.Handler AND avoids an
assertion that would panic if future middleware wraps the router.
Dismissed (with rationale):
- Kimi INFO hardcoded MinIO endpoint: harness is local-by-design.
- Kimi WARN matrixd accepts 502/500: documented; real retriever needs
real upstreams the test doesn't spin up.
- Qwen INFO queryd string.Contains: brittle but very low risk; restating
through typed-error path would couple without adding signal.
go test ./cmd/{matrixd,queryd,pathwayd,observerd} all green.
Verdicts at reports/scrum/_evidence/2026-04-30/verdicts/lift_001_*.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
317 lines
9.0 KiB
Go
317 lines
9.0 KiB
Go
// Staffing workers corpus driver — second-of-two corpora that proves
|
||
// the multi-corpus matrix indexer end-to-end. Mirrors the candidates
|
||
// driver's parquet pattern but handles multi-chunk arrow tables
|
||
// (workers_500k.parquet has multiple row groups, candidates fits in
|
||
// one).
|
||
//
|
||
// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet
|
||
// (500000 rows, 18 cols including role + skills + certifications +
|
||
// archetype + reliability scores + resume_text).
|
||
//
|
||
// IDs prefixed "w-" so multi-corpus matrix queries returning workers
|
||
// alongside candidates ("c-") stay unambiguous in merged results.
|
||
//
|
||
// Default -limit 5000 because the goal of this driver is multi-corpus
|
||
// reality testing, not the 500K stress test (separate concern, see
|
||
// project_golang_lakehouse.md scale framing).
|
||
|
||
package main
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/apache/arrow-go/v18/arrow"
|
||
"github.com/apache/arrow-go/v18/arrow/array"
|
||
"github.com/apache/arrow-go/v18/arrow/memory"
|
||
"github.com/apache/arrow-go/v18/parquet/file"
|
||
"github.com/apache/arrow-go/v18/parquet/pqarrow"
|
||
|
||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest"
|
||
)
|
||
|
||
const (
|
||
dim = 768
|
||
)
|
||
|
||
// workersSource implements corpusingest.Source over an in-memory
|
||
// arrow.Table loaded from workers_500k.parquet. Unlike the candidates
|
||
// driver, this MUST handle multi-chunk arrow columns — a 500K-row
|
||
// parquet has ≥1 row group, each becoming its own chunk after read.
|
||
type workersSource struct {
|
||
cols struct {
|
||
workerID *chunkedInt64
|
||
name, role, city, state, skills, certs, archetype, resume, comm *chunkedString
|
||
}
|
||
n int64
|
||
cur int64
|
||
idPrefix string // "w-" for workers, "e-" for ethereal_workers, etc.
|
||
}
|
||
|
||
// chunkedString lets per-row access work whether the table came back
|
||
// with one chunk or many. Forward-only iteration; not safe to seek.
|
||
type chunkedString struct {
|
||
chunks []*array.String
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedString(col *arrow.Chunked) (*chunkedString, error) {
|
||
cs := &chunkedString{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.String)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch)
|
||
}
|
||
cs.chunks = append(cs.chunks, s)
|
||
cs.sizes = append(cs.sizes, int64(s.Len()))
|
||
}
|
||
return cs, nil
|
||
}
|
||
|
||
// At returns the value at the global row index. O(chunks) per call;
|
||
// fine for our scale (≤5000 rows × ~5 chunks).
|
||
func (c *chunkedString) At(row int64) string {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type chunkedInt64 struct {
|
||
chunks []*array.Int64
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) {
|
||
ci := &chunkedInt64{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.Int64)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch)
|
||
}
|
||
ci.chunks = append(ci.chunks, s)
|
||
ci.sizes = append(ci.sizes, int64(s.Len()))
|
||
}
|
||
return ci, nil
|
||
}
|
||
|
||
func (c *chunkedInt64) At(row int64) int64 {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func newWorkersSource(path, idPrefix string) (*workersSource, func(), error) {
|
||
f, err := os.Open(path)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("open parquet: %w", err)
|
||
}
|
||
pf, err := file.NewParquetReader(f)
|
||
if err != nil {
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("parquet reader: %w", err)
|
||
}
|
||
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("arrow reader: %w", err)
|
||
}
|
||
table, err := fr.ReadTable(context.Background())
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("read table: %w", err)
|
||
}
|
||
|
||
src := &workersSource{n: table.NumRows(), idPrefix: idPrefix}
|
||
schema := table.Schema()
|
||
|
||
stringCol := func(name string) (*chunkedString, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedString(table.Column(idx[0]).Data())
|
||
}
|
||
int64Col := func(name string) (*chunkedInt64, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedInt64(table.Column(idx[0]).Data())
|
||
}
|
||
|
||
cleanup := func() {
|
||
table.Release()
|
||
pf.Close()
|
||
f.Close()
|
||
}
|
||
|
||
wid, err := int64Col("worker_id")
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
src.cols.workerID = wid
|
||
|
||
for _, t := range []struct {
|
||
name string
|
||
dst **chunkedString
|
||
}{
|
||
{"name", &src.cols.name},
|
||
{"role", &src.cols.role},
|
||
{"city", &src.cols.city},
|
||
{"state", &src.cols.state},
|
||
{"skills", &src.cols.skills},
|
||
{"certifications", &src.cols.certs},
|
||
{"archetype", &src.cols.archetype},
|
||
{"resume_text", &src.cols.resume},
|
||
{"communications", &src.cols.comm},
|
||
} {
|
||
col, err := stringCol(t.name)
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
*t.dst = col
|
||
}
|
||
return src, cleanup, nil
|
||
}
|
||
|
||
func (s *workersSource) Next() (corpusingest.Row, error) {
|
||
if s.cur >= s.n {
|
||
return corpusingest.Row{}, io.EOF
|
||
}
|
||
i := s.cur
|
||
s.cur++
|
||
|
||
workerID := s.cols.workerID.At(i)
|
||
name := s.cols.name.At(i)
|
||
role := s.cols.role.At(i)
|
||
city := s.cols.city.At(i)
|
||
state := s.cols.state.At(i)
|
||
skills := s.cols.skills.At(i)
|
||
certs := s.cols.certs.At(i)
|
||
archetype := s.cols.archetype.At(i)
|
||
resume := s.cols.resume.At(i)
|
||
|
||
// Embed text — restored to V0 after 2026-04-29 D experiment.
|
||
// Three variants tested on a query of "Forklift operator with
|
||
// OSHA-30 certification, warehouse experience":
|
||
// V0 (this): structured "Worker role: ... Skills: ... <resume_text>"
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.327
|
||
// V4a (drop): drop labels + resume + archetype, double the role
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.254
|
||
// V4b (resume only): just resume_text, no structured prefix
|
||
// → 4 workers in top-8 (worse mix), 0 Forklift, top 0.379
|
||
// All three surfaced Production Workers / Machine Operators /
|
||
// Line Leads above actual Forklift Operators. Conclusion: the
|
||
// bottleneck is nomic-embed-text 137M's geometry, not text
|
||
// design. Real fixes belong elsewhere — hybrid SQL+semantic
|
||
// (B in next-step menu) or playbook boost (component 5,
|
||
// already shipped). V0 keeps the best worker/candidate mix.
|
||
var b strings.Builder
|
||
b.WriteString("Worker role: ")
|
||
b.WriteString(role)
|
||
b.WriteString(". Skills: ")
|
||
b.WriteString(skills)
|
||
b.WriteString(". Certifications: ")
|
||
b.WriteString(certs)
|
||
b.WriteString(". Based in ")
|
||
b.WriteString(city)
|
||
b.WriteString(", ")
|
||
b.WriteString(state)
|
||
b.WriteString(". Archetype: ")
|
||
b.WriteString(archetype)
|
||
b.WriteString(". ")
|
||
b.WriteString(resume)
|
||
text := b.String()
|
||
|
||
return corpusingest.Row{
|
||
ID: fmt.Sprintf("%s%d", s.idPrefix, workerID),
|
||
Text: text,
|
||
Metadata: map[string]any{
|
||
"worker_id": workerID,
|
||
"name": name,
|
||
"role": role,
|
||
"city": city,
|
||
"state": state,
|
||
"skills": skills,
|
||
"certifications": certs,
|
||
"archetype": archetype,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
func main() {
|
||
var (
|
||
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
|
||
parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet")
|
||
indexName = flag.String("index-name", "workers", "vector index name (e.g. workers, ethereal_workers)")
|
||
idPrefix = flag.String("id-prefix", "w-", "ID prefix to disambiguate worker_id collisions across corpora (e.g. w-, e-)")
|
||
limit = flag.Int("limit", 5000, "limit rows (0 = all rows; default suits multi-corpus reality testing, not stress)")
|
||
drop = flag.Bool("drop", true, "DELETE the index before populate")
|
||
)
|
||
flag.Parse()
|
||
|
||
// An empty prefix collides cross-corpus — exactly the bug the
|
||
// flag exists to prevent. Force callers to be explicit.
|
||
if *idPrefix == "" {
|
||
log.Fatalf("--id-prefix cannot be empty (use 'w-', 'e-', etc. — IDs collide cross-corpus without one)")
|
||
}
|
||
|
||
hc := &http.Client{Timeout: 5 * time.Minute}
|
||
ctx := context.Background()
|
||
|
||
src, cleanup, err := newWorkersSource(*parquetPath, *idPrefix)
|
||
if err != nil {
|
||
log.Fatalf("open workers source: %v", err)
|
||
}
|
||
defer cleanup()
|
||
|
||
stats, err := corpusingest.Run(ctx, corpusingest.Config{
|
||
GatewayURL: *gateway,
|
||
IndexName: *indexName,
|
||
Dimension: dim,
|
||
Distance: "cosine",
|
||
EmbedBatch: 16,
|
||
EmbedWorkers: 8,
|
||
AddBatch: 500,
|
||
Limit: *limit,
|
||
DropExisting: *drop,
|
||
HTTPClient: hc,
|
||
LogProgress: 10 * time.Second,
|
||
}, src)
|
||
if err != nil {
|
||
if errors.Is(err, corpusingest.ErrPartialFailure) {
|
||
fmt.Printf("[%s] WARN partial failure: %v\n", *indexName, err)
|
||
} else {
|
||
log.Fatalf("ingest: %v", err)
|
||
}
|
||
}
|
||
fmt.Printf("[%s] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n",
|
||
*indexName, stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches,
|
||
stats.Wall.Round(time.Millisecond))
|
||
}
|
||
|