root a730fc2016 scrum fixes: 4 real findings landed, 4 false positives dismissed
Cross-lineage scrum review on the 12 commits of this session
(afbb506..06e7152) via Rust gateway :3100 with Opus + Kimi +
Qwen3-coder. Results:

  Real findings landed:
    1. Opus BLOCK — vectord BatchAdd intra-batch duplicates panic
       coder/hnsw's "node not added" length-invariant. Fixed with
       last-write-wins dedup inside BatchAdd before the pre-pass.
       Regression test TestBatchAdd_IntraBatchDedup added.
    2. Opus + Kimi convergent WARN — strings.Contains(err.Error(),
       "status 404") was brittle string-matching to detect cold-
       start playbook state. Fixed: ErrCorpusNotFound sentinel
       returned by searchCorpus on HTTP 404; fetchPlaybookHits
       uses errors.Is.
    3. Opus WARN — corpusingest.Run returned nil on total batch
       failure, masking broken pipelines as "empty corpora." Fixed:
       Stats.FailedBatches counter, ErrPartialFailure sentinel
       returned when nonzero. New regression test
       TestRun_NonzeroFailedBatchesReturnsError.
    4. Opus WARN — dead var _ = io.EOF in staffing_500k/main.go
       was justified by a fictional comment. Removed.

  Drivers (staffing_500k, staffing_candidates, staffing_workers)
  updated to handle ErrPartialFailure gracefully — print warn, keep
  running queries — rather than fatal'ing on transient hiccups
  while still surfacing the failure clearly in the output.

  Documented (no code change):
    - Opus WARN: matrixd /matrix/downgrade reads
      LH_FORCE_FULL_ENRICHMENT from process env when body omits
      it. Comment now explains the opinionated default and points
      callers wanting deterministic behavior to pass the field
      explicitly.

  False positives dismissed (caught and verified, NOT acted on):
    A. Kimi BLOCK on errors.Is + wrapped error in cmd/matrixd:223.
       Verified false: Search wraps with %w (fmt.Errorf("%w: %v",
       ErrEmbed, err)), so errors.Is matches the chain correctly.
    B. Kimi INFO "BatchAdd has no unit tests." Verified false:
       batch_bench_test.go has BenchmarkBatchAdd; the new dedup
       test TestBatchAdd_IntraBatchDedup adds another.
    C. Opus BLOCK on missing finite/zero-norm pre-validation in
       cmd/vectord:280-291. Verified false: line 272 already calls
       vectord.ValidateVector before BatchAdd, so finite + zero-
       norm IS checked. Pre-validation is exhaustive.
    D. Opus WARN on relevance.go tokenRe (Opus self-corrected
       mid-finding when realizing leading char counts toward token
       length).

  Qwen3-coder returned NO FINDINGS — known issue with very long
  diffs through the OpenRouter free tier; lineage rotation worked
  as designed (Opus + Kimi between them caught everything Qwen
  would have).

15-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix, relevance, downgrade, playbook).
Unit tests all green (corpusingest +1, vectord +1).

Per feedback_cross_lineage_review.md: convergent finding #2 (404
detection) is the highest-signal one — both Opus and Kimi
flagged it independently. The other Opus findings stand on
single-reviewer signal but each one verified against the actual
code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 19:42:39 -05:00

297 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Staffing workers corpus driver — second-of-two corpora that proves
// the multi-corpus matrix indexer end-to-end. Mirrors the candidates
// driver's parquet pattern but handles multi-chunk arrow tables
// (workers_500k.parquet has multiple row groups, candidates fits in
// one).
//
// Source: /home/profit/lakehouse/data/datasets/workers_500k.parquet
// (500000 rows, 18 cols including role + skills + certifications +
// archetype + reliability scores + resume_text).
//
// IDs prefixed "w-" so multi-corpus matrix queries returning workers
// alongside candidates ("c-") stay unambiguous in merged results.
//
// Default -limit 5000 because the goal of this driver is multi-corpus
// reality testing, not the 500K stress test (separate concern, see
// project_golang_lakehouse.md scale framing).
package main
import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/corpusingest"
)
const (
indexName = "workers"
dim = 768
)
// workersSource implements corpusingest.Source over an in-memory
// arrow.Table loaded from workers_500k.parquet. Unlike the candidates
// driver, this MUST handle multi-chunk arrow columns — a 500K-row
// parquet has ≥1 row group, each becoming its own chunk after read.
type workersSource struct {
cols struct {
workerID *chunkedInt64
name, role, city, state, skills, certs, archetype, resume, comm *chunkedString
}
n int64
cur int64
}
// chunkedString lets per-row access work whether the table came back
// with one chunk or many. Forward-only iteration; not safe to seek.
type chunkedString struct {
chunks []*array.String
sizes []int64
}
func newChunkedString(col *arrow.Chunked) (*chunkedString, error) {
cs := &chunkedString{}
for i, ch := range col.Chunks() {
s, ok := ch.(*array.String)
if !ok {
return nil, fmt.Errorf("chunk %d is %T, want *array.String", i, ch)
}
cs.chunks = append(cs.chunks, s)
cs.sizes = append(cs.sizes, int64(s.Len()))
}
return cs, nil
}
// At returns the value at the global row index. O(chunks) per call;
// fine for our scale (≤5000 rows × ~5 chunks).
func (c *chunkedString) At(row int64) string {
var offset int64
for i, s := range c.chunks {
n := c.sizes[i]
if row < offset+n {
return s.Value(int(row - offset))
}
offset += n
}
return ""
}
type chunkedInt64 struct {
chunks []*array.Int64
sizes []int64
}
func newChunkedInt64(col *arrow.Chunked) (*chunkedInt64, error) {
ci := &chunkedInt64{}
for i, ch := range col.Chunks() {
s, ok := ch.(*array.Int64)
if !ok {
return nil, fmt.Errorf("chunk %d is %T, want *array.Int64", i, ch)
}
ci.chunks = append(ci.chunks, s)
ci.sizes = append(ci.sizes, int64(s.Len()))
}
return ci, nil
}
func (c *chunkedInt64) At(row int64) int64 {
var offset int64
for i, s := range c.chunks {
n := c.sizes[i]
if row < offset+n {
return s.Value(int(row - offset))
}
offset += n
}
return 0
}
func newWorkersSource(path string) (*workersSource, func(), error) {
f, err := os.Open(path)
if err != nil {
return nil, nil, fmt.Errorf("open parquet: %w", err)
}
pf, err := file.NewParquetReader(f)
if err != nil {
f.Close()
return nil, nil, fmt.Errorf("parquet reader: %w", err)
}
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
pf.Close()
f.Close()
return nil, nil, fmt.Errorf("arrow reader: %w", err)
}
table, err := fr.ReadTable(context.Background())
if err != nil {
pf.Close()
f.Close()
return nil, nil, fmt.Errorf("read table: %w", err)
}
src := &workersSource{n: table.NumRows()}
schema := table.Schema()
stringCol := func(name string) (*chunkedString, error) {
idx := schema.FieldIndices(name)
if len(idx) == 0 {
return nil, fmt.Errorf("column %q not found", name)
}
return newChunkedString(table.Column(idx[0]).Data())
}
int64Col := func(name string) (*chunkedInt64, error) {
idx := schema.FieldIndices(name)
if len(idx) == 0 {
return nil, fmt.Errorf("column %q not found", name)
}
return newChunkedInt64(table.Column(idx[0]).Data())
}
cleanup := func() {
table.Release()
pf.Close()
f.Close()
}
wid, err := int64Col("worker_id")
if err != nil {
cleanup()
return nil, nil, err
}
src.cols.workerID = wid
for _, t := range []struct {
name string
dst **chunkedString
}{
{"name", &src.cols.name},
{"role", &src.cols.role},
{"city", &src.cols.city},
{"state", &src.cols.state},
{"skills", &src.cols.skills},
{"certifications", &src.cols.certs},
{"archetype", &src.cols.archetype},
{"resume_text", &src.cols.resume},
{"communications", &src.cols.comm},
} {
col, err := stringCol(t.name)
if err != nil {
cleanup()
return nil, nil, err
}
*t.dst = col
}
return src, cleanup, nil
}
func (s *workersSource) Next() (corpusingest.Row, error) {
if s.cur >= s.n {
return corpusingest.Row{}, io.EOF
}
i := s.cur
s.cur++
workerID := s.cols.workerID.At(i)
name := s.cols.name.At(i)
role := s.cols.role.At(i)
city := s.cols.city.At(i)
state := s.cols.state.At(i)
skills := s.cols.skills.At(i)
certs := s.cols.certs.At(i)
archetype := s.cols.archetype.At(i)
resume := s.cols.resume.At(i)
// Embed text: role first (most semantically dense for staffing
// queries), then skills + certs, then location, archetype, finally
// the prose resume. Same ordering rationale as the candidates
// driver and the original 500K driver.
var b strings.Builder
b.WriteString("Worker role: ")
b.WriteString(role)
b.WriteString(". Skills: ")
b.WriteString(skills)
b.WriteString(". Certifications: ")
b.WriteString(certs)
b.WriteString(". Based in ")
b.WriteString(city)
b.WriteString(", ")
b.WriteString(state)
b.WriteString(". Archetype: ")
b.WriteString(archetype)
b.WriteString(". ")
b.WriteString(resume)
return corpusingest.Row{
ID: fmt.Sprintf("w-%d", workerID),
Text: b.String(),
Metadata: map[string]any{
"worker_id": workerID,
"name": name,
"role": role,
"city": city,
"state": state,
"skills": skills,
"certifications": certs,
"archetype": archetype,
},
}, nil
}
func main() {
var (
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
parquetPath = flag.String("parquet", "/home/profit/lakehouse/data/datasets/workers_500k.parquet", "workers parquet")
limit = flag.Int("limit", 5000, "limit rows (0 = all 500K — usually not what you want here)")
drop = flag.Bool("drop", true, "DELETE workers index before populate")
)
flag.Parse()
hc := &http.Client{Timeout: 5 * time.Minute}
ctx := context.Background()
src, cleanup, err := newWorkersSource(*parquetPath)
if err != nil {
log.Fatalf("open workers source: %v", err)
}
defer cleanup()
stats, err := corpusingest.Run(ctx, corpusingest.Config{
GatewayURL: *gateway,
IndexName: indexName,
Dimension: dim,
Distance: "cosine",
EmbedBatch: 16,
EmbedWorkers: 8,
AddBatch: 500,
Limit: *limit,
DropExisting: *drop,
HTTPClient: hc,
LogProgress: 10 * time.Second,
}, src)
if err != nil {
if errors.Is(err, corpusingest.ErrPartialFailure) {
fmt.Printf("[workers] WARN partial failure: %v\n", err)
} else {
log.Fatalf("ingest: %v", err)
}
}
fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n",
stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches,
stats.Wall.Round(time.Millisecond))
}