The 5-loop substrate's load-bearing gate is verified — playbook +
matrix indexer give the results we're looking for. Per the report's
rubric, lift ≥ 50% of discoveries means matrix is doing real work;
7/8 = 87.5% blew through that.
Harness was structurally hiding bugs behind a 5-daemon stripped boot.
Expanding to the full 10-daemon prod stack surfaced 7 fixes in cascade:
1. driver→matrixd: {"query": ...} → {"query_text": ...} field name
2. harness temp toml missing [s3] → wrong default bucket → catalogd
rehydrate 500 on first call
3. harness→queryd SQL probe: {"q": ...} → {"sql": ...} field name
4. expand boot from 5 → 10 daemons in dep-ordered launch
5. add SQL surface probe (3-row CSV ingest → COUNT(*)=3 assertion)
6. candidates corpus was synthetic SWE-tech (Swift/iOS, Scala/Spark) —
wrong domain for staffing queries; replaced with ethereal_workers
(10K rows, real staffing schema, "e-" id prefix to avoid collision
with workers' "w-"). staffing_workers driver gains -index-name +
-id-prefix flags so the same binary serves both corpora
7. local_judge qwen3.5:latest is a vision-SSM 256K-ctx build running
~30s per judge call against the lift loop; reverted to
qwen2.5:latest (~1s/call, 30× faster, held lift theory)
Each contract drift (1, 3) is now locked into a cmd/<bin>/main_test.go
so future drift fires in `go test`, not in a reality run. R-005 closed:
- cmd/matrixd/main_test.go (new) — playbook record drift detector +
score bounds + 6 routes mounted
- cmd/queryd/main_test.go — wrong-field-name drift detector
- cmd/pathwayd/main_test.go (new) — 9 routes + add round-trip + retire
- cmd/observerd/main_test.go (new) — 4 routes + invalid-op + unknown-mode
`go test ./cmd/{matrixd,queryd,pathwayd,observerd}` all green.
Reality test results (reports/reality-tests/playbook_lift_001.{json,md}):
Queries 21 (staffing-domain, 7 categories)
Discoveries 8 (judge ≠ cosine top-1)
Lifts 7/8 (87.5%)
Boosts triggered 9
Mean Δ distance -0.053 (warm closer than cold)
OOD honesty dental/RN/SWE rated 1, no fake matches
Cross-corpus boosts confirmed (e- ↔ w- swaps in lifts)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
311 lines
8.8 KiB
Go
311 lines
8.8 KiB
Go
// Staffing workers corpus driver — second-of-two corpora that proves
|
||
// the multi-corpus matrix indexer end-to-end. Mirrors the candidates
|
||
// driver's parquet pattern but handles multi-chunk arrow tables
|
||
// (workers_500k.parquet has multiple row groups, candidates fits in
|
||
// one).
|
||
//
|
||
// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet
|
||
// (500000 rows, 18 cols including role + skills + certifications +
|
||
// archetype + reliability scores + resume_text).
|
||
//
|
||
// IDs prefixed "w-" so multi-corpus matrix queries returning workers
|
||
// alongside candidates ("c-") stay unambiguous in merged results.
|
||
//
|
||
// Default -limit 5000 because the goal of this driver is multi-corpus
|
||
// reality testing, not the 500K stress test (separate concern, see
|
||
// project_golang_lakehouse.md scale framing).
|
||
|
||
package main
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/apache/arrow-go/v18/arrow"
|
||
"github.com/apache/arrow-go/v18/arrow/array"
|
||
"github.com/apache/arrow-go/v18/arrow/memory"
|
||
"github.com/apache/arrow-go/v18/parquet/file"
|
||
"github.com/apache/arrow-go/v18/parquet/pqarrow"
|
||
|
||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest"
|
||
)
|
||
|
||
const (
|
||
dim = 768
|
||
)
|
||
|
||
// workersSource implements corpusingest.Source over an in-memory
|
||
// arrow.Table loaded from workers_500k.parquet. Unlike the candidates
|
||
// driver, this MUST handle multi-chunk arrow columns — a 500K-row
|
||
// parquet has ≥1 row group, each becoming its own chunk after read.
|
||
type workersSource struct {
|
||
cols struct {
|
||
workerID *chunkedInt64
|
||
name, role, city, state, skills, certs, archetype, resume, comm *chunkedString
|
||
}
|
||
n int64
|
||
cur int64
|
||
idPrefix string // "w-" for workers, "e-" for ethereal_workers, etc.
|
||
}
|
||
|
||
// chunkedString lets per-row access work whether the table came back
|
||
// with one chunk or many. Forward-only iteration; not safe to seek.
|
||
type chunkedString struct {
|
||
chunks []*array.String
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedString(col *arrow.Chunked) (*chunkedString, error) {
|
||
cs := &chunkedString{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.String)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch)
|
||
}
|
||
cs.chunks = append(cs.chunks, s)
|
||
cs.sizes = append(cs.sizes, int64(s.Len()))
|
||
}
|
||
return cs, nil
|
||
}
|
||
|
||
// At returns the value at the global row index. O(chunks) per call;
|
||
// fine for our scale (≤5000 rows × ~5 chunks).
|
||
func (c *chunkedString) At(row int64) string {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type chunkedInt64 struct {
|
||
chunks []*array.Int64
|
||
sizes []int64
|
||
}
|
||
|
||
func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) {
|
||
ci := &chunkedInt64{}
|
||
for i, ch := range col.Chunks() {
|
||
s, ok := ch.(*array.Int64)
|
||
if !ok {
|
||
return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch)
|
||
}
|
||
ci.chunks = append(ci.chunks, s)
|
||
ci.sizes = append(ci.sizes, int64(s.Len()))
|
||
}
|
||
return ci, nil
|
||
}
|
||
|
||
func (c *chunkedInt64) At(row int64) int64 {
|
||
var offset int64
|
||
for i, s := range c.chunks {
|
||
n := c.sizes[i]
|
||
if row < offset+n {
|
||
return s.Value(int(row - offset))
|
||
}
|
||
offset += n
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func newWorkersSource(path, idPrefix string) (*workersSource, func(), error) {
|
||
f, err := os.Open(path)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("open parquet: %w", err)
|
||
}
|
||
pf, err := file.NewParquetReader(f)
|
||
if err != nil {
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("parquet reader: %w", err)
|
||
}
|
||
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("arrow reader: %w", err)
|
||
}
|
||
table, err := fr.ReadTable(context.Background())
|
||
if err != nil {
|
||
pf.Close()
|
||
f.Close()
|
||
return nil, nil, fmt.Errorf("read table: %w", err)
|
||
}
|
||
|
||
src := &workersSource{n: table.NumRows(), idPrefix: idPrefix}
|
||
schema := table.Schema()
|
||
|
||
stringCol := func(name string) (*chunkedString, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedString(table.Column(idx[0]).Data())
|
||
}
|
||
int64Col := func(name string) (*chunkedInt64, error) {
|
||
idx := schema.FieldIndices(name)
|
||
if len(idx) == 0 {
|
||
return nil, fmt.Errorf("column %q not found", name)
|
||
}
|
||
return newChunkedInt64(table.Column(idx[0]).Data())
|
||
}
|
||
|
||
cleanup := func() {
|
||
table.Release()
|
||
pf.Close()
|
||
f.Close()
|
||
}
|
||
|
||
wid, err := int64Col("worker_id")
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
src.cols.workerID = wid
|
||
|
||
for _, t := range []struct {
|
||
name string
|
||
dst **chunkedString
|
||
}{
|
||
{"name", &src.cols.name},
|
||
{"role", &src.cols.role},
|
||
{"city", &src.cols.city},
|
||
{"state", &src.cols.state},
|
||
{"skills", &src.cols.skills},
|
||
{"certifications", &src.cols.certs},
|
||
{"archetype", &src.cols.archetype},
|
||
{"resume_text", &src.cols.resume},
|
||
{"communications", &src.cols.comm},
|
||
} {
|
||
col, err := stringCol(t.name)
|
||
if err != nil {
|
||
cleanup()
|
||
return nil, nil, err
|
||
}
|
||
*t.dst = col
|
||
}
|
||
return src, cleanup, nil
|
||
}
|
||
|
||
func (s *workersSource) Next() (corpusingest.Row, error) {
|
||
if s.cur >= s.n {
|
||
return corpusingest.Row{}, io.EOF
|
||
}
|
||
i := s.cur
|
||
s.cur++
|
||
|
||
workerID := s.cols.workerID.At(i)
|
||
name := s.cols.name.At(i)
|
||
role := s.cols.role.At(i)
|
||
city := s.cols.city.At(i)
|
||
state := s.cols.state.At(i)
|
||
skills := s.cols.skills.At(i)
|
||
certs := s.cols.certs.At(i)
|
||
archetype := s.cols.archetype.At(i)
|
||
resume := s.cols.resume.At(i)
|
||
|
||
// Embed text — restored to V0 after 2026-04-29 D experiment.
|
||
// Three variants tested on a query of "Forklift operator with
|
||
// OSHA-30 certification, warehouse experience":
|
||
// V0 (this): structured "Worker role: ... Skills: ... <resume_text>"
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.327
|
||
// V4a (drop): drop labels + resume + archetype, double the role
|
||
// → 6 workers in top-8, 0 Forklift, top dist 0.254
|
||
// V4b (resume only): just resume_text, no structured prefix
|
||
// → 4 workers in top-8 (worse mix), 0 Forklift, top 0.379
|
||
// All three surfaced Production Workers / Machine Operators /
|
||
// Line Leads above actual Forklift Operators. Conclusion: the
|
||
// bottleneck is nomic-embed-text 137M's geometry, not text
|
||
// design. Real fixes belong elsewhere — hybrid SQL+semantic
|
||
// (B in next-step menu) or playbook boost (component 5,
|
||
// already shipped). V0 keeps the best worker/candidate mix.
|
||
var b strings.Builder
|
||
b.WriteString("Worker role: ")
|
||
b.WriteString(role)
|
||
b.WriteString(". Skills: ")
|
||
b.WriteString(skills)
|
||
b.WriteString(". Certifications: ")
|
||
b.WriteString(certs)
|
||
b.WriteString(". Based in ")
|
||
b.WriteString(city)
|
||
b.WriteString(", ")
|
||
b.WriteString(state)
|
||
b.WriteString(". Archetype: ")
|
||
b.WriteString(archetype)
|
||
b.WriteString(". ")
|
||
b.WriteString(resume)
|
||
text := b.String()
|
||
|
||
return corpusingest.Row{
|
||
ID: fmt.Sprintf("%s%d", s.idPrefix, workerID),
|
||
Text: text,
|
||
Metadata: map[string]any{
|
||
"worker_id": workerID,
|
||
"name": name,
|
||
"role": role,
|
||
"city": city,
|
||
"state": state,
|
||
"skills": skills,
|
||
"certifications": certs,
|
||
"archetype": archetype,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
func main() {
|
||
var (
|
||
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
|
||
parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet")
|
||
indexName = flag.String("index-name", "workers", "vector index name (e.g. workers, ethereal_workers)")
|
||
idPrefix = flag.String("id-prefix", "w-", "ID prefix to disambiguate worker_id collisions across corpora (e.g. w-, e-)")
|
||
limit = flag.Int("limit", 5000, "limit rows (0 = all rows; default suits multi-corpus reality testing, not stress)")
|
||
drop = flag.Bool("drop", true, "DELETE the index before populate")
|
||
)
|
||
flag.Parse()
|
||
|
||
hc := &http.Client{Timeout: 5 * time.Minute}
|
||
ctx := context.Background()
|
||
|
||
src, cleanup, err := newWorkersSource(*parquetPath, *idPrefix)
|
||
if err != nil {
|
||
log.Fatalf("open workers source: %v", err)
|
||
}
|
||
defer cleanup()
|
||
|
||
stats, err := corpusingest.Run(ctx, corpusingest.Config{
|
||
GatewayURL: *gateway,
|
||
IndexName: *indexName,
|
||
Dimension: dim,
|
||
Distance: "cosine",
|
||
EmbedBatch: 16,
|
||
EmbedWorkers: 8,
|
||
AddBatch: 500,
|
||
Limit: *limit,
|
||
DropExisting: *drop,
|
||
HTTPClient: hc,
|
||
LogProgress: 10 * time.Second,
|
||
}, src)
|
||
if err != nil {
|
||
if errors.Is(err, corpusingest.ErrPartialFailure) {
|
||
fmt.Printf("[%s] WARN partial failure: %v\n", *indexName, err)
|
||
} else {
|
||
log.Fatalf("ingest: %v", err)
|
||
}
|
||
}
|
||
fmt.Printf("[%s] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n",
|
||
*indexName, stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches,
|
||
stats.Wall.Round(time.Millisecond))
|
||
}
|
||
|