// Package corpusingest is the generalized text→vector ingestion // pipeline. Originally extracted from scripts/staffing_500k/main.go; // reusable by any corpus-builder script that needs to embed a stream // of (id, text, metadata) rows and push them into a vectord index. // // Design: per-corpus Source impls own the parsing/column-mapping; // this package owns the parallel-embed dispatcher, batching, vectord // index lifecycle, and progress reporting. Adding a corpus is one // Source struct + one main.go that calls Run; no copy-pasted pipeline. // // Per docs/SPEC.md §3.4 component 1 (corpus builders): this is the // substrate the rest of the matrix indexer's value depends on. Get // the pipeline right, then iterate on builders. package corpusingest import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "sync" "sync/atomic" "time" ) // Row is one logical document in a corpus. Metadata may be any // JSON-marshalable value (struct, map, json.RawMessage); the library // marshals once per row before pushing to vectord. type Row struct { ID string Text string Metadata any } // Source produces a stream of rows. Source lifecycle (open/close) is // owned by the caller; this package only consumes Next() until io.EOF. type Source interface { // Next returns the next row or io.EOF when the source is drained. // Other errors cause Run to abort with the error wrapped. Next() (Row, error) } // Config drives one Run. Defaults match the Ollama-on-A4000 sweet // spot from the 500K validation; override per-deployment if needed. type Config struct { GatewayURL string // default "http://127.0.0.1:3110" IndexName string // required Dimension int // required, must match the embed model output Distance string // default "cosine" EmbedModel string // optional; empty = embedd's default EmbedBatch int // default 16, texts per /v1/embed call EmbedWorkers int // default 8, parallel embed goroutines AddBatch int // default 1000, items per /v1/vectors/index/add call Limit int // 0 = no limit (process all rows) DropExisting bool // true = DELETE index first; false = idempotent reuse HTTPClient *http.Client // LogProgress is the interval between progress logs. 0 disables. LogProgress time.Duration } // Stats reports run outcomes. type Stats struct { Scanned int64 Embedded int64 Added int64 Wall time.Duration } // Run executes the ingest pipeline. Returns on source EOF after all // in-flight jobs drain, on context cancellation, or on the first // embed/add error (errors are logged via slog and the pipeline // continues — partial-failure semantics; see comment inside). func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { cfg = applyDefaults(cfg) if err := validateConfig(cfg); err != nil { return Stats{}, err } t0 := time.Now() if err := prepareIndex(ctx, cfg); err != nil { return Stats{}, fmt.Errorf("prepare index: %w", err) } jobs := make(chan job, cfg.EmbedWorkers*2) var ( totalEmbedded int64 totalAdded int64 ) var wg sync.WaitGroup for i := 0; i < cfg.EmbedWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for j := range jobs { vecs, err := embedBatch(ctx, cfg, j.texts) if err != nil { // Partial-failure semantics: log + continue. A wedged // embed batch shouldn't kill 8 workers' worth of // progress; the operator decides whether to re-run // based on the final Embedded vs Scanned delta. slog.Warn("corpusingest: embed batch failed", "index", cfg.IndexName, "items", len(j.texts), "err", err) continue } // Defense against a degraded embed backend that returns // fewer vectors than texts: vecs[i] would panic in // addBatch otherwise. Caught by ContextCancel unit test. if len(vecs) != len(j.ids) { slog.Warn("corpusingest: embed returned wrong count", "index", cfg.IndexName, "want", len(j.ids), "got", len(vecs)) continue } atomic.AddInt64(&totalEmbedded, int64(len(vecs))) if err := addBatch(ctx, cfg, j.ids, vecs, j.metas); err != nil { slog.Warn("corpusingest: add batch failed", "index", cfg.IndexName, "items", len(j.ids), "err", err) continue } atomic.AddInt64(&totalAdded, int64(len(j.ids))) } }() } stopProgress := make(chan struct{}) progressDone := make(chan struct{}) if cfg.LogProgress > 0 { go func() { defer close(progressDone) ticker := time.NewTicker(cfg.LogProgress) defer ticker.Stop() for { select { case <-ticker.C: slog.Info("corpusingest: progress", "index", cfg.IndexName, "embedded", atomic.LoadInt64(&totalEmbedded), "added", atomic.LoadInt64(&totalAdded)) case <-stopProgress: return case <-ctx.Done(): return } } }() } else { close(progressDone) } scanned, err := drainSource(ctx, cfg, src, jobs) close(jobs) wg.Wait() close(stopProgress) // tell the progress goroutine to exit; would otherwise hang Run forever (caught by candidates e2e 2026-04-29) <-progressDone stats := Stats{ Scanned: scanned, Embedded: atomic.LoadInt64(&totalEmbedded), Added: atomic.LoadInt64(&totalAdded), Wall: time.Since(t0), } if err != nil { return stats, err } return stats, nil } // drainSource pulls rows, batches them, and dispatches into jobs. // Returns when source EOFs, ctx cancels, or limit is hit. func drainSource(ctx context.Context, cfg Config, src Source, jobs chan<- job) (int64, error) { curIDs := make([]string, 0, cfg.EmbedBatch) curTexts := make([]string, 0, cfg.EmbedBatch) curMetas := make([]json.RawMessage, 0, cfg.EmbedBatch) flush := func() { if len(curIDs) == 0 { return } jobs <- job{ids: curIDs, texts: curTexts, metas: curMetas} curIDs = make([]string, 0, cfg.EmbedBatch) curTexts = make([]string, 0, cfg.EmbedBatch) curMetas = make([]json.RawMessage, 0, cfg.EmbedBatch) } var scanned int64 for { if ctx.Err() != nil { flush() return scanned, ctx.Err() } row, err := src.Next() if err == io.EOF { flush() return scanned, nil } if err != nil { flush() return scanned, fmt.Errorf("source row %d: %w", scanned, err) } if row.ID == "" { return scanned, fmt.Errorf("source row %d: empty id", scanned) } // Empty Text would 400 at embedd; skip-with-warn rather than // abort the whole run — a stray empty row shouldn't kill 500K. if row.Text == "" { slog.Warn("corpusingest: skipping row with empty text", "index", cfg.IndexName, "id", row.ID) scanned++ continue } meta, err := marshalMeta(row.Metadata) if err != nil { return scanned, fmt.Errorf("row %s: marshal metadata: %w", row.ID, err) } curIDs = append(curIDs, row.ID) curTexts = append(curTexts, row.Text) curMetas = append(curMetas, meta) scanned++ if len(curIDs) >= cfg.EmbedBatch { flush() } if cfg.Limit > 0 && scanned >= int64(cfg.Limit) { flush() return scanned, nil } } } // job is the unit of work between drainSource and the embed workers. // Internal type; kept small so the channel buffer doesn't bloat. type job struct { ids []string texts []string metas []json.RawMessage } func marshalMeta(v any) (json.RawMessage, error) { if v == nil { return nil, nil } if rm, ok := v.(json.RawMessage); ok { return rm, nil } return json.Marshal(v) } // prepareIndex creates the vectord index, optionally dropping a // preexisting one. Idempotent on matching params: 409 from create is // treated as "already exists, reuse." If DropExisting is set, DELETE // fires first to give a clean slate. func prepareIndex(ctx context.Context, cfg Config) error { if cfg.DropExisting { if err := httpDelete(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName); err != nil { // 404 (not found) is fine — drop-existing is idempotent. slog.Debug("corpusingest: drop existing", "err", err) } } body, _ := json.Marshal(map[string]any{ "name": cfg.IndexName, "dimension": cfg.Dimension, "distance": cfg.Distance, }) code, msg, err := httpPost(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/vectors/index", body) if err != nil { return err } switch code { case http.StatusCreated: slog.Info("corpusingest: created index", "name", cfg.IndexName, "dim", cfg.Dimension, "distance", cfg.Distance) case http.StatusConflict: // Already exists — vectord didn't change params on conflict. // Caller's responsibility to ensure existing dim/distance match. slog.Info("corpusingest: index already exists, reusing", "name", cfg.IndexName) default: return fmt.Errorf("create index %d: %s", code, msg) } return nil } func embedBatch(ctx context.Context, cfg Config, texts []string) ([][]float32, error) { body := map[string]any{"texts": texts} if cfg.EmbedModel != "" { body["model"] = cfg.EmbedModel } bs, _ := json.Marshal(body) code, msg, raw, err := httpPostRaw(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/embed", bs) if err != nil { return nil, err } if code != http.StatusOK { return nil, fmt.Errorf("embed status %d: %s", code, msg) } var er struct { Vectors [][]float32 `json:"vectors"` } if err := json.Unmarshal(raw, &er); err != nil { return nil, fmt.Errorf("embed decode: %w", err) } return er.Vectors, nil } func addBatch(ctx context.Context, cfg Config, ids []string, vecs [][]float32, metas []json.RawMessage) error { type addItem struct { ID string `json:"id"` Vector []float32 `json:"vector"` Metadata json.RawMessage `json:"metadata,omitempty"` } // Add-batch may exceed cfg.AddBatch when EmbedBatch divides into it // non-evenly; vectord handles that fine. Keep one HTTP per job. items := make([]addItem, len(ids)) for i := range ids { items[i] = addItem{ID: ids[i], Vector: vecs[i], Metadata: metas[i]} } bs, _ := json.Marshal(map[string]any{"items": items}) code, msg, err := httpPost(ctx, cfg.HTTPClient, cfg.GatewayURL+"/v1/vectors/index/"+cfg.IndexName+"/add", bs) if err != nil { return err } if code != http.StatusOK { return fmt.Errorf("add status %d: %s", code, msg) } return nil } // ── HTTP helpers — small, no extra deps ───────────────────────── func httpPost(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, error) { code, msg, _, err := httpPostRaw(ctx, hc, url, body) return code, msg, err } func httpPostRaw(ctx context.Context, hc *http.Client, url string, body []byte) (int, string, []byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return 0, "", nil, err } req.Header.Set("Content-Type", "application/json") resp, err := hc.Do(req) if err != nil { return 0, "", nil, err } defer resp.Body.Close() raw, err := io.ReadAll(resp.Body) if err != nil { return resp.StatusCode, "", nil, err } preview := raw if len(preview) > 256 { preview = preview[:256] } return resp.StatusCode, string(preview), raw, nil } func httpDelete(ctx context.Context, hc *http.Client, url string) error { req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { return err } resp, err := hc.Do(req) if err != nil { return err } defer resp.Body.Close() io.Copy(io.Discard, resp.Body) if resp.StatusCode >= 400 && resp.StatusCode != http.StatusNotFound { return fmt.Errorf("delete status %d", resp.StatusCode) } return nil } // ── config validation + defaults ──────────────────────────────── func applyDefaults(cfg Config) Config { if cfg.GatewayURL == "" { cfg.GatewayURL = "http://127.0.0.1:3110" } if cfg.Distance == "" { cfg.Distance = "cosine" } if cfg.EmbedBatch <= 0 { cfg.EmbedBatch = 16 } if cfg.EmbedWorkers <= 0 { cfg.EmbedWorkers = 8 } if cfg.AddBatch <= 0 { cfg.AddBatch = 1000 } if cfg.HTTPClient == nil { cfg.HTTPClient = &http.Client{Timeout: 5 * time.Minute} } if cfg.LogProgress < 0 { cfg.LogProgress = 0 } return cfg } func validateConfig(cfg Config) error { if cfg.IndexName == "" { return errors.New("corpusingest: IndexName is required") } if cfg.Dimension <= 0 { return errors.New("corpusingest: Dimension must be > 0") } return nil }