root 6c02c905c8 scrum lift_001: 4 fixes from cross-lineage review
Cross-lineage scrum on b2e45f7 produced 1 convergent + 3 single-reviewer
findings worth fixing. All apply.

1. (Opus WARN + Qwen INFO convergent) scripts/playbook_lift.sh: replace
   sleep 2.5 in SQL probe with active polling up to 5s. refresh_every=1s
   is a lower bound; under load the manifest may not be visible in a
   fixed sleep, which would 4xx the probe and abort the reality run.

2. (Opus INFO) scripts/playbook_lift.sh: report template glued
   "env JUDGE_MODEL" + value as "env JUDGE_MODELqwen2.5:latest" with no
   separator. Replaced two :+/:- substitution chains with a single
   JUDGE_SOURCE variable computed once at the top of the harness.

3. (Opus INFO) scripts/staffing_workers/main.go: -id-prefix "" silently
   allowed, defeating the flag's purpose (cross-corpus collision prevent).
   Now log.Fatal at startup with explicit hint.

4. (Opus WARN) cmd/{pathwayd,observerd}/main_test.go: newTestRouter
   returned http.Handler then re-cast to chi.Router for chi.Walk.
   Returning chi.Router directly satisfies http.Handler AND avoids an
   assertion that would panic if future middleware wraps the router.

Dismissed (with rationale):
- Kimi INFO hardcoded MinIO endpoint: harness is local-by-design.
- Kimi WARN matrixd accepts 502/500: documented; real retriever needs
  real upstreams the test doesn't spin up.
- Qwen INFO queryd string.Contains: brittle but very low risk; restating
  through typed-error path would couple without adding signal.

go test ./cmd/{matrixd,queryd,pathwayd,observerd} all green.

Verdicts at reports/scrum/_evidence/2026-04-30/verdicts/lift_001_*.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 06:27:24 -05:00

317 lines
9.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Staffing workers corpus driver — second-of-two corpora that proves
// the multi-corpus matrix indexer end-to-end. Mirrors the candidates
// driver's parquet pattern but handles multi-chunk arrow tables
// (workers_500k.parquet has multiple row groups, candidates fits in
// one).
//
// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet
// (500000 rows, 18 cols including role + skills + certifications +
// archetype + reliability scores + resume_text).
//
// IDs prefixed "w-" so multi-corpus matrix queries returning workers
// alongside candidates ("c-") stay unambiguous in merged results.
//
// Default -limit 5000 because the goal of this driver is multi-corpus
// reality testing, not the 500K stress test (separate concern, see
// project_golang_lakehouse.md scale framing).
package main
import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest"
)
const (
dim = 768
)
// workersSource implements corpusingest.Source over an in-memory
// arrow.Table loaded from workers_500k.parquet. Unlike the candidates
// driver, this MUST handle multi-chunk arrow columns — a 500K-row
// parquet has ≥1 row group, each becoming its own chunk after read.
type workersSource struct {
cols struct {
workerID *chunkedInt64
name, role, city, state, skills, certs, archetype, resume, comm *chunkedString
}
n int64
cur int64
idPrefix string // "w-" for workers, "e-" for ethereal_workers, etc.
}
// chunkedString lets per-row access work whether the table came back
// with one chunk or many. Forward-only iteration; not safe to seek.
type chunkedString struct {
chunks []*array.String
sizes []int64
}
func newChunkedString(col *arrow.Chunked) (*chunkedString, error) {
cs := &chunkedString{}
for i, ch := range col.Chunks() {
s, ok := ch.(*array.String)
if !ok {
return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch)
}
cs.chunks = append(cs.chunks, s)
cs.sizes = append(cs.sizes, int64(s.Len()))
}
return cs, nil
}
// At returns the value at the global row index. O(chunks) per call;
// fine for our scale (≤5000 rows × ~5 chunks).
func (c *chunkedString) At(row int64) string {
var offset int64
for i, s := range c.chunks {
n := c.sizes[i]
if row < offset+n {
return s.Value(int(row - offset))
}
offset += n
}
return ""
}
type chunkedInt64 struct {
chunks []*array.Int64
sizes []int64
}
func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) {
ci := &chunkedInt64{}
for i, ch := range col.Chunks() {
s, ok := ch.(*array.Int64)
if !ok {
return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch)
}
ci.chunks = append(ci.chunks, s)
ci.sizes = append(ci.sizes, int64(s.Len()))
}
return ci, nil
}
func (c *chunkedInt64) At(row int64) int64 {
var offset int64
for i, s := range c.chunks {
n := c.sizes[i]
if row < offset+n {
return s.Value(int(row - offset))
}
offset += n
}
return 0
}
func newWorkersSource(path, idPrefix string) (*workersSource, func(), error) {
f, err := os.Open(path)
if err != nil {
return nil, nil, fmt.Errorf("open parquet: %w", err)
}
pf, err := file.NewParquetReader(f)
if err != nil {
f.Close()
return nil, nil, fmt.Errorf("parquet reader: %w", err)
}
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
pf.Close()
f.Close()
return nil, nil, fmt.Errorf("arrow reader: %w", err)
}
table, err := fr.ReadTable(context.Background())
if err != nil {
pf.Close()
f.Close()
return nil, nil, fmt.Errorf("read table: %w", err)
}
src := &workersSource{n: table.NumRows(), idPrefix: idPrefix}
schema := table.Schema()
stringCol := func(name string) (*chunkedString, error) {
idx := schema.FieldIndices(name)
if len(idx) == 0 {
return nil, fmt.Errorf("column %q not found", name)
}
return newChunkedString(table.Column(idx[0]).Data())
}
int64Col := func(name string) (*chunkedInt64, error) {
idx := schema.FieldIndices(name)
if len(idx) == 0 {
return nil, fmt.Errorf("column %q not found", name)
}
return newChunkedInt64(table.Column(idx[0]).Data())
}
cleanup := func() {
table.Release()
pf.Close()
f.Close()
}
wid, err := int64Col("worker_id")
if err != nil {
cleanup()
return nil, nil, err
}
src.cols.workerID = wid
for _, t := range []struct {
name string
dst **chunkedString
}{
{"name", &src.cols.name},
{"role", &src.cols.role},
{"city", &src.cols.city},
{"state", &src.cols.state},
{"skills", &src.cols.skills},
{"certifications", &src.cols.certs},
{"archetype", &src.cols.archetype},
{"resume_text", &src.cols.resume},
{"communications", &src.cols.comm},
} {
col, err := stringCol(t.name)
if err != nil {
cleanup()
return nil, nil, err
}
*t.dst = col
}
return src, cleanup, nil
}
func (s *workersSource) Next() (corpusingest.Row, error) {
if s.cur >= s.n {
return corpusingest.Row{}, io.EOF
}
i := s.cur
s.cur++
workerID := s.cols.workerID.At(i)
name := s.cols.name.At(i)
role := s.cols.role.At(i)
city := s.cols.city.At(i)
state := s.cols.state.At(i)
skills := s.cols.skills.At(i)
certs := s.cols.certs.At(i)
archetype := s.cols.archetype.At(i)
resume := s.cols.resume.At(i)
// Embed text — restored to V0 after 2026-04-29 D experiment.
// Three variants tested on a query of "Forklift operator with
// OSHA-30 certification, warehouse experience":
// V0 (this): structured "Worker role: ... Skills: ... <resume_text>"
// → 6 workers in top-8, 0 Forklift, top dist 0.327
// V4a (drop): drop labels + resume + archetype, double the role
// → 6 workers in top-8, 0 Forklift, top dist 0.254
// V4b (resume only): just resume_text, no structured prefix
// → 4 workers in top-8 (worse mix), 0 Forklift, top 0.379
// All three surfaced Production Workers / Machine Operators /
// Line Leads above actual Forklift Operators. Conclusion: the
// bottleneck is nomic-embed-text 137M's geometry, not text
// design. Real fixes belong elsewhere — hybrid SQL+semantic
// (B in next-step menu) or playbook boost (component 5,
// already shipped). V0 keeps the best worker/candidate mix.
var b strings.Builder
b.WriteString("Worker role: ")
b.WriteString(role)
b.WriteString(". Skills: ")
b.WriteString(skills)
b.WriteString(". Certifications: ")
b.WriteString(certs)
b.WriteString(". Based in ")
b.WriteString(city)
b.WriteString(", ")
b.WriteString(state)
b.WriteString(". Archetype: ")
b.WriteString(archetype)
b.WriteString(". ")
b.WriteString(resume)
text := b.String()
return corpusingest.Row{
ID: fmt.Sprintf("%s%d", s.idPrefix, workerID),
Text: text,
Metadata: map[string]any{
"worker_id": workerID,
"name": name,
"role": role,
"city": city,
"state": state,
"skills": skills,
"certifications": certs,
"archetype": archetype,
},
}, nil
}
func main() {
var (
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet")
indexName = flag.String("index-name", "workers", "vector index name (e.g. workers, ethereal_workers)")
idPrefix = flag.String("id-prefix", "w-", "ID prefix to disambiguate worker_id collisions across corpora (e.g. w-, e-)")
limit = flag.Int("limit", 5000, "limit rows (0 = all rows; default suits multi-corpus reality testing, not stress)")
drop = flag.Bool("drop", true, "DELETE the index before populate")
)
flag.Parse()
// An empty prefix collides cross-corpus — exactly the bug the
// flag exists to prevent. Force callers to be explicit.
if *idPrefix == "" {
log.Fatalf("--id-prefix cannot be empty (use 'w-', 'e-', etc. — IDs collide cross-corpus without one)")
}
hc := &http.Client{Timeout: 5 * time.Minute}
ctx := context.Background()
src, cleanup, err := newWorkersSource(*parquetPath, *idPrefix)
if err != nil {
log.Fatalf("open workers source: %v", err)
}
defer cleanup()
stats, err := corpusingest.Run(ctx, corpusingest.Config{
GatewayURL: *gateway,
IndexName: *indexName,
Dimension: dim,
Distance: "cosine",
EmbedBatch: 16,
EmbedWorkers: 8,
AddBatch: 500,
Limit: *limit,
DropExisting: *drop,
HTTPClient: hc,
LogProgress: 10 * time.Second,
}, src)
if err != nil {
if errors.Is(err, corpusingest.ErrPartialFailure) {
fmt.Printf("[%s] WARN partial failure: %v\n", *indexName, err)
} else {
log.Fatalf("ingest: %v", err)
}
}
fmt.Printf("[%s] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n",
*indexName, stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches,
stats.Wall.Round(time.Millisecond))
}