Cross-lineage scrum review on the 12 commits of this session
(afbb506..06e7152) via Rust gateway :3100 with Opus + Kimi +
Qwen3-coder. Results:
Real findings landed:
1. Opus BLOCK — vectord BatchAdd intra-batch duplicates panic
coder/hnsw's "node not added" length-invariant. Fixed with
last-write-wins dedup inside BatchAdd before the pre-pass.
Regression test TestBatchAdd_IntraBatchDedup added.
2. Opus + Kimi convergent WARN — strings.Contains(err.Error(),
"status 404") was brittle string-matching to detect cold-
start playbook state. Fixed: ErrCorpusNotFound sentinel
returned by searchCorpus on HTTP 404; fetchPlaybookHits
uses errors.Is.
3. Opus WARN — corpusingest.Run returned nil on total batch
failure, masking broken pipelines as "empty corpora." Fixed:
Stats.FailedBatches counter, ErrPartialFailure sentinel
returned when nonzero. New regression test
TestRun_NonzeroFailedBatchesReturnsError.
4. Opus WARN — dead var _ = io.EOF in staffing_500k/main.go
was justified by a fictional comment. Removed.
Drivers (staffing_500k, staffing_candidates, staffing_workers)
updated to handle ErrPartialFailure gracefully — print warn, keep
running queries — rather than fatal'ing on transient hiccups
while still surfacing the failure clearly in the output.
Documented (no code change):
- Opus WARN: matrixd /matrix/downgrade reads
LH_FORCE_FULL_ENRICHMENT from process env when body omits
it. Comment now explains the opinionated default and points
callers wanting deterministic behavior to pass the field
explicitly.
False positives dismissed (caught and verified, NOT acted on):
A. Kimi BLOCK on errors.Is + wrapped error in cmd/matrixd:223.
Verified false: Search wraps with %w (fmt.Errorf("%w: %v",
ErrEmbed, err)), so errors.Is matches the chain correctly.
B. Kimi INFO "BatchAdd has no unit tests." Verified false:
batch_bench_test.go has BenchmarkBatchAdd; the new dedup
test TestBatchAdd_IntraBatchDedup adds another.
C. Opus BLOCK on missing finite/zero-norm pre-validation in
cmd/vectord:280-291. Verified false: line 272 already calls
vectord.ValidateVector before BatchAdd, so finite + zero-
norm IS checked. Pre-validation is exhaustive.
D. Opus WARN on relevance.go tokenRe (Opus self-corrected
mid-finding when realizing leading char counts toward token
length).
Qwen3-coder returned NO FINDINGS — known issue with very long
diffs through the OpenRouter free tier; lineage rotation worked
as designed (Opus + Kimi between them caught everything Qwen
would have).
15-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix, relevance, downgrade, playbook).
Unit tests all green (corpusingest +1, vectord +1).
Per feedback_cross_lineage_review.md: convergent finding #2 (404
detection) is the highest-signal one — both Opus and Kimi
flagged it independently. The other Opus findings stand on
single-reviewer signal but each one verified against the actual
code.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
438 lines
13 KiB
Go
438 lines
13 KiB
Go
// Package corpusingest is the generalized text→vector ingestion
|
|
// pipeline. Originally extracted from scripts/staffing_500k/main.go;
|
|
// reusable by any corpus-builder script that needs to embed a stream
|
|
// of (id, text, metadata) rows and push them into a vectord index.
|
|
//
|
|
// Design: per-corpus Source impls own the parsing/column-mapping;
|
|
// this package owns the parallel-embed dispatcher, batching, vectord
|
|
// index lifecycle, and progress reporting. Adding a corpus is one
|
|
// Source struct + one main.go that calls Run; no copy-pasted pipeline.
|
|
//
|
|
// Per docs/SPEC.md §3.4 component 1 (corpus builders): this is the
|
|
// substrate the rest of the matrix indexer's value depends on. Get
|
|
// the pipeline right, then iterate on builders.
|
|
package corpusingest
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Row is one logical document in a corpus. Metadata may be any
|
|
// JSON-marshalable value (struct, map, json.RawMessage); the library
|
|
// marshals once per row before pushing to vectord.
|
|
type Row struct {
|
|
ID string
|
|
Text string
|
|
Metadata any
|
|
}
|
|
|
|
// Source produces a stream of rows. Source lifecycle (open/close) is
|
|
// owned by the caller; this package only consumes Next() until io.EOF.
|
|
type Source interface {
|
|
// Next returns the next row or io.EOF when the source is drained.
|
|
// Other errors cause Run to abort with the error wrapped.
|
|
Next() (Row, error)
|
|
}
|
|
|
|
// Config drives one Run. Defaults match the Ollama-on-A4000 sweet
|
|
// spot from the 500K validation; override per-deployment if needed.
|
|
type Config struct {
|
|
GatewayURL string // default "http://127.0.0.1:3110"
|
|
IndexName string // required
|
|
Dimension int // required, must match the embed model output
|
|
Distance string // default "cosine"
|
|
EmbedModel string // optional; empty = embedd's default
|
|
EmbedBatch int // default 16, texts per /v1/embed call
|
|
EmbedWorkers int // default 8, parallel embed goroutines
|
|
AddBatch int // default 1000, items per /v1/vectors/index/add call
|
|
Limit int // 0 = no limit (process all rows)
|
|
DropExisting bool // true = DELETE index first; false = idempotent reuse
|
|
HTTPClient *http.Client
|
|
// LogProgress is the interval between progress logs. 0 disables.
|
|
LogProgress time.Duration
|
|
}
|
|
|
|
// Stats reports run outcomes. FailedBatches counts embed-or-add
|
|
// batches that errored out and were skipped (partial-failure
|
|
// semantics). When non-zero, Run returns ErrPartialFailure so
|
|
// callers can't accidentally treat "1 of 313 batches succeeded"
|
|
// as a successful run.
|
|
type Stats struct {
|
|
Scanned int64
|
|
Embedded int64
|
|
Added int64
|
|
Wall time.Duration
|
|
FailedBatches int64
|
|
}
|
|
|
|
// ErrPartialFailure signals that one or more batches errored during
|
|
// Run. Stats.FailedBatches has the count; the caller decides
|
|
// whether to retry / log / abort. Per 2026-04-29 cross-lineage
|
|
// scrum (Opus WARN): the original behavior returned nil even when
|
|
// 100% of batches failed silently, making "embedded=0/scanned=N"
|
|
// look like an empty corpus rather than a broken pipeline.
|
|
var ErrPartialFailure = errors.New("corpusingest: one or more batches failed")
|
|
|
|
// Run executes the ingest pipeline. Returns on source EOF after all
|
|
// in-flight jobs drain, on context cancellation, or on the first
|
|
// embed/add error (errors are logged via slog and the pipeline
|
|
// continues — partial-failure semantics; see comment inside).
|
|
func Run(ctx context.Context, cfg Config, src Source) (Stats, error) {
|
|
cfg = applyDefaults(cfg)
|
|
if err := validateConfig(cfg); err != nil {
|
|
return Stats{}, err
|
|
}
|
|
|
|
t0 := time.Now()
|
|
if err := prepareIndex(ctx, cfg); err != nil {
|
|
return Stats{}, fmt.Errorf("prepare index: %w", err)
|
|
}
|
|
|
|
jobs := make(chan job, cfg.EmbedWorkers*2)
|
|
|
|
var (
|
|
totalEmbedded int64
|
|
totalAdded int64
|
|
failedBatches int64
|
|
)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < cfg.EmbedWorkers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := range jobs {
|
|
vecs, err := embedBatch(ctx, cfg, j.texts)
|
|
if err != nil {
|
|
// Partial-failure semantics: log + continue. A wedged
|
|
// embed batch shouldn't kill 8 workers' worth of
|
|
// progress; Run returns ErrPartialFailure on any
|
|
// failure so callers can't miss the signal.
|
|
slog.Warn("corpusingest: embed batch failed",
|
|
"index", cfg.IndexName, "items", len(j.texts), "err", err)
|
|
atomic.AddInt64(&failedBatches, 1)
|
|
continue
|
|
}
|
|
// Defense against a degraded embed backend that returns
|
|
// fewer vectors than texts: vecs[i] would panic in
|
|
// addBatch otherwise. Caught by ContextCancel unit test.
|
|
if len(vecs) != len(j.ids) {
|
|
slog.Warn("corpusingest: embed returned wrong count",
|
|
"index", cfg.IndexName, "want", len(j.ids), "got", len(vecs))
|
|
atomic.AddInt64(&failedBatches, 1)
|
|
continue
|
|
}
|
|
atomic.AddInt64(&totalEmbedded, int64(len(vecs)))
|
|
if err := addBatch(ctx, cfg, j.ids, vecs, j.metas); err != nil {
|
|
slog.Warn("corpusingest: add batch failed",
|
|
"index", cfg.IndexName, "items", len(j.ids), "err", err)
|
|
atomic.AddInt64(&failedBatches, 1)
|
|
continue
|
|
}
|
|
atomic.AddInt64(&totalAdded, int64(len(j.ids)))
|
|
}
|
|
}()
|
|
}
|
|
|
|
stopProgress := make(chan struct{})
|
|
progressDone := make(chan struct{})
|
|
if cfg.LogProgress > 0 {
|
|
go func() {
|
|
defer close(progressDone)
|
|
ticker := time.NewTicker(cfg.LogProgress)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
slog.Info("corpusingest: progress",
|
|
"index", cfg.IndexName,
|
|
"embedded", atomic.LoadInt64(&totalEmbedded),
|
|
"added", atomic.LoadInt64(&totalAdded))
|
|
case <-stopProgress:
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
close(progressDone)
|
|
}
|
|
|
|
scanned, err := drainSource(ctx, cfg, src, jobs)
|
|
close(jobs)
|
|
wg.Wait()
|
|
close(stopProgress) // tell the progress goroutine to exit; would otherwise hang Run forever (caught by candidates e2e 2026-04-29)
|
|
<-progressDone
|
|
|
|
stats := Stats{
|
|
Scanned: scanned,
|
|
Embedded: atomic.LoadInt64(&totalEmbedded),
|
|
Added: atomic.LoadInt64(&totalAdded),
|
|
Wall: time.Since(t0),
|
|
FailedBatches: atomic.LoadInt64(&failedBatches),
|
|
}
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
if stats.FailedBatches > 0 {
|
|
return stats, fmt.Errorf("%w: %d batches failed (embedded=%d added=%d scanned=%d)",
|
|
ErrPartialFailure, stats.FailedBatches, stats.Embedded, stats.Added, stats.Scanned)
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
// drainSource pulls rows, batches them, and dispatches into jobs.
|
|
// Returns when source EOFs, ctx cancels, or limit is hit.
|
|
func drainSource(ctx context.Context, cfg Config, src Source, jobs chan<- job) (int64, error) {
|
|
curIDs := make([]string, 0, cfg.EmbedBatch)
|
|
curTexts := make([]string, 0, cfg.EmbedBatch)
|
|
curMetas := make([]json.RawMessage, 0, cfg.EmbedBatch)
|
|
|
|
flush := func() {
|
|
if len(curIDs) == 0 {
|
|
return
|
|
}
|
|
jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas}
|
|
curIDs = make([]string, 0, cfg.EmbedBatch)
|
|
curTexts = make([]string, 0, cfg.EmbedBatch)
|
|
curMetas = make([]json.RawMessage, 0, cfg.EmbedBatch)
|
|
}
|
|
|
|
var scanned int64
|
|
for {
|
|
if ctx.Err() != nil {
|
|
flush()
|
|
return scanned, ctx.Err()
|
|
}
|
|
row, err := src.Next()
|
|
if err == io.EOF {
|
|
flush()
|
|
return scanned, nil
|
|
}
|
|
if err != nil {
|
|
flush()
|
|
return scanned, fmt.Errorf("source row %d: %w", scanned, err)
|
|
}
|
|
if row.ID == "" {
|
|
return scanned, fmt.Errorf("source row %d: empty id", scanned)
|
|
}
|
|
// Empty Text would 400 at embedd; skip-with-warn rather than
|
|
// abort the whole run — a stray empty row shouldn't kill 500K.
|
|
if row.Text == "" {
|
|
slog.Warn("corpusingest: skipping row with empty text",
|
|
"index", cfg.IndexName, "id", row.ID)
|
|
scanned++
|
|
continue
|
|
}
|
|
meta, err := marshalMeta(row.Metadata)
|
|
if err != nil {
|
|
return scanned, fmt.Errorf("row %s: marshal metadata: %w", row.ID, err)
|
|
}
|
|
curIDs = append(curIDs, row.ID)
|
|
curTexts = append(curTexts, row.Text)
|
|
curMetas = append(curMetas, meta)
|
|
scanned++
|
|
|
|
if len(curIDs) >= cfg.EmbedBatch {
|
|
flush()
|
|
}
|
|
if cfg.Limit > 0 && scanned >= int64(cfg.Limit) {
|
|
flush()
|
|
return scanned, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// job is the unit of work between drainSource and the embed workers.
|
|
// Internal type; kept small so the channel buffer doesn't bloat.
|
|
type job struct {
|
|
ids []string
|
|
texts []string
|
|
metas []json.RawMessage
|
|
}
|
|
|
|
func marshalMeta(v any) (json.RawMessage, error) {
|
|
if v == nil {
|
|
return nil, nil
|
|
}
|
|
if rm, ok := v.(json.RawMessage); ok {
|
|
return rm, nil
|
|
}
|
|
return json.Marshal(v)
|
|
}
|
|
|
|
// prepareIndex creates the vectord index, optionally dropping a
|
|
// preexisting one. Idempotent on matching params: 409 from create is
|
|
// treated as "already exists, reuse." If DropExisting is set, DELETE
|
|
// fires first to give a clean slate.
|
|
func prepareIndex(ctx context.Context, cfg Config) error {
|
|
if cfg.DropExisting {
|
|
if err := httpDelete(ctx, cfg.HTTPClient,
|
|
cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName); err != nil {
|
|
// 404 (not found) is fine — drop-existing is idempotent.
|
|
slog.Debug("corpusingest: drop existing", "err", err)
|
|
}
|
|
}
|
|
body, _ := json.Marshal(map[string]any{
|
|
"name": cfg.IndexName,
|
|
"dimension": cfg.Dimension,
|
|
"distance": cfg.Distance,
|
|
})
|
|
code, msg, err := httpPost(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/vectors/index", body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch code {
|
|
case http.StatusCreated:
|
|
slog.Info("corpusingest: created index",
|
|
"name", cfg.IndexName, "dim", cfg.Dimension, "distance", cfg.Distance)
|
|
case http.StatusConflict:
|
|
// Already exists — vectord didn't change params on conflict.
|
|
// Caller's responsibility to ensure existing dim/distance match.
|
|
slog.Info("corpusingest: index already exists, reusing", "name", cfg.IndexName)
|
|
default:
|
|
return fmt.Errorf("create index %d: %s", code, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func embedBatch(ctx context.Context, cfg Config, texts []string) ([][]float32, error) {
|
|
body := map[string]any{"texts": texts}
|
|
if cfg.EmbedModel != "" {
|
|
body["model"] = cfg.EmbedModel
|
|
}
|
|
bs, _ := json.Marshal(body)
|
|
code, msg, raw, err := httpPostRaw(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/embed", bs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if code != http.StatusOK {
|
|
return nil, fmt.Errorf("embed status %d: %s", code, msg)
|
|
}
|
|
var er struct {
|
|
Vectors [][]float32 `json:"vectors"`
|
|
}
|
|
if err := json.Unmarshal(raw, &er); err != nil {
|
|
return nil, fmt.Errorf("embed decode: %w", err)
|
|
}
|
|
return er.Vectors, nil
|
|
}
|
|
|
|
func addBatch(ctx context.Context, cfg Config, ids []string, vecs [][]float32, metas []json.RawMessage) error {
|
|
type addItem struct {
|
|
ID string `json:"id"`
|
|
Vector []float32 `json:"vector"`
|
|
Metadata json.RawMessage `json:"metadata,omitempty"`
|
|
}
|
|
// Add-batch may exceed cfg.AddBatch when EmbedBatch divides into it
|
|
// non-evenly; vectord handles that fine. Keep one HTTP per job.
|
|
items := make([]addItem, len(ids))
|
|
for i := range ids {
|
|
items[i] = addItem{ID: ids[i], Vector: vecs[i], Metadata: metas[i]}
|
|
}
|
|
bs, _ := json.Marshal(map[string]any{"items": items})
|
|
code, msg, err := httpPost(ctx, cfg.HTTPClient,
|
|
cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName+"/add", bs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if code != http.StatusOK {
|
|
return fmt.Errorf("add status %d: %s", code, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── HTTP helpers — small, no extra deps ─────────────────────────
|
|
|
|
func httpPost(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, error) {
|
|
code, msg, _, err := httpPostRaw(ctx, hc, url, body)
|
|
return code, msg, err
|
|
}
|
|
|
|
func httpPostRaw(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, []byte, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return 0, "", nil, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return 0, "", nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
raw, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return resp.StatusCode, "", nil, err
|
|
}
|
|
preview := raw
|
|
if len(preview) > 256 {
|
|
preview = preview[:256]
|
|
}
|
|
return resp.StatusCode, string(preview), raw, nil
|
|
}
|
|
|
|
func httpDelete(ctx context.Context, hc *http.Client, url string) error {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
io.Copy(io.Discard, resp.Body)
|
|
if resp.StatusCode >= 400 && resp.StatusCode != http.StatusNotFound {
|
|
return fmt.Errorf("delete status %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── config validation + defaults ────────────────────────────────
|
|
|
|
func applyDefaults(cfg Config) Config {
|
|
if cfg.GatewayURL == "" {
|
|
cfg.GatewayURL = "http://127.0.0.1:3110"
|
|
}
|
|
if cfg.Distance == "" {
|
|
cfg.Distance = "cosine"
|
|
}
|
|
if cfg.EmbedBatch <= 0 {
|
|
cfg.EmbedBatch = 16
|
|
}
|
|
if cfg.EmbedWorkers <= 0 {
|
|
cfg.EmbedWorkers = 8
|
|
}
|
|
if cfg.AddBatch <= 0 {
|
|
cfg.AddBatch = 1000
|
|
}
|
|
if cfg.HTTPClient == nil {
|
|
cfg.HTTPClient = &http.Client{Timeout: 5 * time.Minute}
|
|
}
|
|
if cfg.LogProgress < 0 {
|
|
cfg.LogProgress = 0
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func validateConfig(cfg Config) error {
|
|
if cfg.IndexName == "" {
|
|
return errors.New("corpusingest: IndexName is required")
|
|
}
|
|
if cfg.Dimension <= 0 {
|
|
return errors.New("corpusingest: Dimension must be > 0")
|
|
}
|
|
return nil
|
|
}
|