Generalizes the staffing_500k driver's embed-and-push loop into
internal/corpusingest. Per docs/SPEC.md §3.4 component 1 (corpus
builders): adding a new staffing/code/playbook corpus is now one
Source impl + one main.go calling Run, not 200 lines of pipeline
copy-paste.
API:
type Source interface { Next() (Row, error) }
func Run(ctx, Config, Source) (Stats, error)
Library owns:
- Index lifecycle (create, optional drop-existing, idempotent
reuse on 409)
- Parallel embed dispatcher (configurable workers + batch size)
- Vectord push batching
- Progress logging + Stats reporting
- Partial-failure semantics (log + continue per-batch errors;
operator decides on re-run via Stats.Embedded vs Scanned delta)
Per-corpus driver owns: source parsing + column→Row mapping +
post-ingest validation queries.
Refactor scripts/staffing_500k/main.go to use it. Driver is now
~190 lines (was 339), with the embed/add plumbing replaced by one
Run call. -drop flag added so callers can opt out of the destructive
DELETE-first behavior (default still true to keep the 500K test
clean-recall semantics).
Unit tests (internal/corpusingest/ingest_test.go, 8/8 PASS):
- Pipeline shape: 50 rows / 16 batch → 4 embed + 4 add calls,
every ID added exactly once, vectors at correct dimension
- DropExisting fires DELETE
- 409 on create → reuse existing index
- Limit stops early
- Empty Text rows skipped (counted as scanned, not added)
- Required IndexName + Dimension validation
- Context cancel stops mid-pipeline
Real bug caught and fixed by the test suite: if embedd ever returns
fewer vectors than texts in the request (degraded backend), the
addBatch loop would panic with index-out-of-range. Worker now
length-checks the response and logs+skips on mismatch.
12-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix). vet clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
412 lines
12 KiB
Go
412 lines
12 KiB
Go
// Package corpusingest is the generalized text→vector ingestion
|
|
// pipeline. Originally extracted from scripts/staffing_500k/main.go;
|
|
// reusable by any corpus-builder script that needs to embed a stream
|
|
// of (id, text, metadata) rows and push them into a vectord index.
|
|
//
|
|
// Design: per-corpus Source impls own the parsing/column-mapping;
|
|
// this package owns the parallel-embed dispatcher, batching, vectord
|
|
// index lifecycle, and progress reporting. Adding a corpus is one
|
|
// Source struct + one main.go that calls Run; no copy-pasted pipeline.
|
|
//
|
|
// Per docs/SPEC.md §3.4 component 1 (corpus builders): this is the
|
|
// substrate the rest of the matrix indexer's value depends on. Get
|
|
// the pipeline right, then iterate on builders.
|
|
package corpusingest
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Row is one logical document in a corpus. Metadata may be any
|
|
// JSON-marshalable value (struct, map, json.RawMessage); the library
|
|
// marshals once per row before pushing to vectord.
|
|
type Row struct {
|
|
ID string
|
|
Text string
|
|
Metadata any
|
|
}
|
|
|
|
// Source produces a stream of rows. Source lifecycle (open/close) is
|
|
// owned by the caller; this package only consumes Next() until io.EOF.
|
|
type Source interface {
|
|
// Next returns the next row or io.EOF when the source is drained.
|
|
// Other errors cause Run to abort with the error wrapped.
|
|
Next() (Row, error)
|
|
}
|
|
|
|
// Config drives one Run. Defaults match the Ollama-on-A4000 sweet
|
|
// spot from the 500K validation; override per-deployment if needed.
|
|
type Config struct {
|
|
GatewayURL string // default "http://127.0.0.1:3110"
|
|
IndexName string // required
|
|
Dimension int // required, must match the embed model output
|
|
Distance string // default "cosine"
|
|
EmbedModel string // optional; empty = embedd's default
|
|
EmbedBatch int // default 16, texts per /v1/embed call
|
|
EmbedWorkers int // default 8, parallel embed goroutines
|
|
AddBatch int // default 1000, items per /v1/vectors/index/add call
|
|
Limit int // 0 = no limit (process all rows)
|
|
DropExisting bool // true = DELETE index first; false = idempotent reuse
|
|
HTTPClient *http.Client
|
|
// LogProgress is the interval between progress logs. 0 disables.
|
|
LogProgress time.Duration
|
|
}
|
|
|
|
// Stats reports run outcomes.
|
|
type Stats struct {
|
|
Scanned int64
|
|
Embedded int64
|
|
Added int64
|
|
Wall time.Duration
|
|
}
|
|
|
|
// Run executes the ingest pipeline. Returns on source EOF after all
|
|
// in-flight jobs drain, on context cancellation, or on the first
|
|
// embed/add error (errors are logged via slog and the pipeline
|
|
// continues — partial-failure semantics; see comment inside).
|
|
func Run(ctx context.Context, cfg Config, src Source) (Stats, error) {
|
|
cfg = applyDefaults(cfg)
|
|
if err := validateConfig(cfg); err != nil {
|
|
return Stats{}, err
|
|
}
|
|
|
|
t0 := time.Now()
|
|
if err := prepareIndex(ctx, cfg); err != nil {
|
|
return Stats{}, fmt.Errorf("prepare index: %w", err)
|
|
}
|
|
|
|
jobs := make(chan job, cfg.EmbedWorkers*2)
|
|
|
|
var (
|
|
totalEmbedded int64
|
|
totalAdded int64
|
|
)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < cfg.EmbedWorkers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := range jobs {
|
|
vecs, err := embedBatch(ctx, cfg, j.texts)
|
|
if err != nil {
|
|
// Partial-failure semantics: log + continue. A wedged
|
|
// embed batch shouldn't kill 8 workers' worth of
|
|
// progress; the operator decides whether to re-run
|
|
// based on the final Embedded vs Scanned delta.
|
|
slog.Warn("corpusingest: embed batch failed",
|
|
"index", cfg.IndexName, "items", len(j.texts), "err", err)
|
|
continue
|
|
}
|
|
// Defense against a degraded embed backend that returns
|
|
// fewer vectors than texts: vecs[i] would panic in
|
|
// addBatch otherwise. Caught by ContextCancel unit test.
|
|
if len(vecs) != len(j.ids) {
|
|
slog.Warn("corpusingest: embed returned wrong count",
|
|
"index", cfg.IndexName, "want", len(j.ids), "got", len(vecs))
|
|
continue
|
|
}
|
|
atomic.AddInt64(&totalEmbedded, int64(len(vecs)))
|
|
if err := addBatch(ctx, cfg, j.ids, vecs, j.metas); err != nil {
|
|
slog.Warn("corpusingest: add batch failed",
|
|
"index", cfg.IndexName, "items", len(j.ids), "err", err)
|
|
continue
|
|
}
|
|
atomic.AddInt64(&totalAdded, int64(len(j.ids)))
|
|
}
|
|
}()
|
|
}
|
|
|
|
progressDone := make(chan struct{})
|
|
if cfg.LogProgress > 0 {
|
|
ticker := time.NewTicker(cfg.LogProgress)
|
|
go func() {
|
|
defer close(progressDone)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
slog.Info("corpusingest: progress",
|
|
"index", cfg.IndexName,
|
|
"embedded", atomic.LoadInt64(&totalEmbedded),
|
|
"added", atomic.LoadInt64(&totalAdded))
|
|
case <-ctx.Done():
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
close(progressDone)
|
|
}
|
|
|
|
scanned, err := drainSource(ctx, cfg, src, jobs)
|
|
close(jobs)
|
|
wg.Wait()
|
|
<-progressDone
|
|
|
|
stats := Stats{
|
|
Scanned: scanned,
|
|
Embedded: atomic.LoadInt64(&totalEmbedded),
|
|
Added: atomic.LoadInt64(&totalAdded),
|
|
Wall: time.Since(t0),
|
|
}
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
// drainSource pulls rows, batches them, and dispatches into jobs.
|
|
// Returns when source EOFs, ctx cancels, or limit is hit.
|
|
func drainSource(ctx context.Context, cfg Config, src Source, jobs chan<- job) (int64, error) {
|
|
curIDs := make([]string, 0, cfg.EmbedBatch)
|
|
curTexts := make([]string, 0, cfg.EmbedBatch)
|
|
curMetas := make([]json.RawMessage, 0, cfg.EmbedBatch)
|
|
|
|
flush := func() {
|
|
if len(curIDs) == 0 {
|
|
return
|
|
}
|
|
jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas}
|
|
curIDs = make([]string, 0, cfg.EmbedBatch)
|
|
curTexts = make([]string, 0, cfg.EmbedBatch)
|
|
curMetas = make([]json.RawMessage, 0, cfg.EmbedBatch)
|
|
}
|
|
|
|
var scanned int64
|
|
for {
|
|
if ctx.Err() != nil {
|
|
flush()
|
|
return scanned, ctx.Err()
|
|
}
|
|
row, err := src.Next()
|
|
if err == io.EOF {
|
|
flush()
|
|
return scanned, nil
|
|
}
|
|
if err != nil {
|
|
flush()
|
|
return scanned, fmt.Errorf("source row %d: %w", scanned, err)
|
|
}
|
|
if row.ID == "" {
|
|
return scanned, fmt.Errorf("source row %d: empty id", scanned)
|
|
}
|
|
// Empty Text would 400 at embedd; skip-with-warn rather than
|
|
// abort the whole run — a stray empty row shouldn't kill 500K.
|
|
if row.Text == "" {
|
|
slog.Warn("corpusingest: skipping row with empty text",
|
|
"index", cfg.IndexName, "id", row.ID)
|
|
scanned++
|
|
continue
|
|
}
|
|
meta, err := marshalMeta(row.Metadata)
|
|
if err != nil {
|
|
return scanned, fmt.Errorf("row %s: marshal metadata: %w", row.ID, err)
|
|
}
|
|
curIDs = append(curIDs, row.ID)
|
|
curTexts = append(curTexts, row.Text)
|
|
curMetas = append(curMetas, meta)
|
|
scanned++
|
|
|
|
if len(curIDs) >= cfg.EmbedBatch {
|
|
flush()
|
|
}
|
|
if cfg.Limit > 0 && scanned >= int64(cfg.Limit) {
|
|
flush()
|
|
return scanned, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// job is the unit of work between drainSource and the embed workers.
|
|
// Internal type; kept small so the channel buffer doesn't bloat.
|
|
type job struct {
|
|
ids []string
|
|
texts []string
|
|
metas []json.RawMessage
|
|
}
|
|
|
|
func marshalMeta(v any) (json.RawMessage, error) {
|
|
if v == nil {
|
|
return nil, nil
|
|
}
|
|
if rm, ok := v.(json.RawMessage); ok {
|
|
return rm, nil
|
|
}
|
|
return json.Marshal(v)
|
|
}
|
|
|
|
// prepareIndex creates the vectord index, optionally dropping a
|
|
// preexisting one. Idempotent on matching params: 409 from create is
|
|
// treated as "already exists, reuse." If DropExisting is set, DELETE
|
|
// fires first to give a clean slate.
|
|
func prepareIndex(ctx context.Context, cfg Config) error {
|
|
if cfg.DropExisting {
|
|
if err := httpDelete(ctx, cfg.HTTPClient,
|
|
cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName); err != nil {
|
|
// 404 (not found) is fine — drop-existing is idempotent.
|
|
slog.Debug("corpusingest: drop existing", "err", err)
|
|
}
|
|
}
|
|
body, _ := json.Marshal(map[string]any{
|
|
"name": cfg.IndexName,
|
|
"dimension": cfg.Dimension,
|
|
"distance": cfg.Distance,
|
|
})
|
|
code, msg, err := httpPost(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/vectors/index", body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch code {
|
|
case http.StatusCreated:
|
|
slog.Info("corpusingest: created index",
|
|
"name", cfg.IndexName, "dim", cfg.Dimension, "distance", cfg.Distance)
|
|
case http.StatusConflict:
|
|
// Already exists — vectord didn't change params on conflict.
|
|
// Caller's responsibility to ensure existing dim/distance match.
|
|
slog.Info("corpusingest: index already exists, reusing", "name", cfg.IndexName)
|
|
default:
|
|
return fmt.Errorf("create index %d: %s", code, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func embedBatch(ctx context.Context, cfg Config, texts []string) ([][]float32, error) {
|
|
body := map[string]any{"texts": texts}
|
|
if cfg.EmbedModel != "" {
|
|
body["model"] = cfg.EmbedModel
|
|
}
|
|
bs, _ := json.Marshal(body)
|
|
code, msg, raw, err := httpPostRaw(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/embed", bs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if code != http.StatusOK {
|
|
return nil, fmt.Errorf("embed status %d: %s", code, msg)
|
|
}
|
|
var er struct {
|
|
Vectors [][]float32 `json:"vectors"`
|
|
}
|
|
if err := json.Unmarshal(raw, &er); err != nil {
|
|
return nil, fmt.Errorf("embed decode: %w", err)
|
|
}
|
|
return er.Vectors, nil
|
|
}
|
|
|
|
func addBatch(ctx context.Context, cfg Config, ids []string, vecs [][]float32, metas []json.RawMessage) error {
|
|
type addItem struct {
|
|
ID string `json:"id"`
|
|
Vector []float32 `json:"vector"`
|
|
Metadata json.RawMessage `json:"metadata,omitempty"`
|
|
}
|
|
// Add-batch may exceed cfg.AddBatch when EmbedBatch divides into it
|
|
// non-evenly; vectord handles that fine. Keep one HTTP per job.
|
|
items := make([]addItem, len(ids))
|
|
for i := range ids {
|
|
items[i] = addItem{ID: ids[i], Vector: vecs[i], Metadata: metas[i]}
|
|
}
|
|
bs, _ := json.Marshal(map[string]any{"items": items})
|
|
code, msg, err := httpPost(ctx, cfg.HTTPClient,
|
|
cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName+"/add", bs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if code != http.StatusOK {
|
|
return fmt.Errorf("add status %d: %s", code, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── HTTP helpers — small, no extra deps ─────────────────────────
|
|
|
|
func httpPost(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, error) {
|
|
code, msg, _, err := httpPostRaw(ctx, hc, url, body)
|
|
return code, msg, err
|
|
}
|
|
|
|
func httpPostRaw(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, []byte, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return 0, "", nil, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return 0, "", nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
raw, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return resp.StatusCode, "", nil, err
|
|
}
|
|
preview := raw
|
|
if len(preview) > 256 {
|
|
preview = preview[:256]
|
|
}
|
|
return resp.StatusCode, string(preview), raw, nil
|
|
}
|
|
|
|
func httpDelete(ctx context.Context, hc *http.Client, url string) error {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
io.Copy(io.Discard, resp.Body)
|
|
if resp.StatusCode >= 400 && resp.StatusCode != http.StatusNotFound {
|
|
return fmt.Errorf("delete status %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── config validation + defaults ────────────────────────────────
|
|
|
|
func applyDefaults(cfg Config) Config {
|
|
if cfg.GatewayURL == "" {
|
|
cfg.GatewayURL = "http://127.0.0.1:3110"
|
|
}
|
|
if cfg.Distance == "" {
|
|
cfg.Distance = "cosine"
|
|
}
|
|
if cfg.EmbedBatch <= 0 {
|
|
cfg.EmbedBatch = 16
|
|
}
|
|
if cfg.EmbedWorkers <= 0 {
|
|
cfg.EmbedWorkers = 8
|
|
}
|
|
if cfg.AddBatch <= 0 {
|
|
cfg.AddBatch = 1000
|
|
}
|
|
if cfg.HTTPClient == nil {
|
|
cfg.HTTPClient = &http.Client{Timeout: 5 * time.Minute}
|
|
}
|
|
if cfg.LogProgress < 0 {
|
|
cfg.LogProgress = 0
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func validateConfig(cfg Config) error {
|
|
if cfg.IndexName == "" {
|
|
return errors.New("corpusingest: IndexName is required")
|
|
}
|
|
if cfg.Dimension <= 0 {
|
|
return errors.New("corpusingest: Dimension must be > 0")
|
|
}
|
|
return nil
|
|
}
|