diff --git a/cmd/matrixd/main.go b/cmd/matrixd/main.go index 2630660..6a0a49f 100644 --- a/cmd/matrixd/main.go +++ b/cmd/matrixd/main.go @@ -143,8 +143,14 @@ func (h *handlers) handlePlaybookRecord(w http.ResponseWriter, r *http.Request) } // downgradeRequest is the POST /matrix/downgrade body. Mirrors -// matrix.DowngradeInput; ForceFullOverride is read from the -// LH_FORCE_FULL_ENRICHMENT env var when omitted from the body. +// matrix.DowngradeInput. When ForceFullOverride is omitted from +// the body, the value falls back to matrixd's process env +// (LH_FORCE_FULL_ENRICHMENT) — an opinionated default that lets +// operators set the env var on the matrixd unit and have every +// gate decision honor it without per-request changes. Per +// 2026-04-29 cross-lineage scrum (Opus WARN): callers that want +// deterministic gate behavior independent of matrixd's env should +// pass ForceFullOverride explicitly in the body. type downgradeRequest struct { Mode string `json:"mode"` Model string `json:"model"` diff --git a/internal/corpusingest/ingest.go b/internal/corpusingest/ingest.go index 9038722..c7ae905 100644 --- a/internal/corpusingest/ingest.go +++ b/internal/corpusingest/ingest.go @@ -62,14 +62,27 @@ type Config struct { LogProgress time.Duration } -// Stats reports run outcomes. +// Stats reports run outcomes. FailedBatches counts embed-or-add +// batches that errored out and were skipped (partial-failure +// semantics). When non-zero, Run returns ErrPartialFailure so +// callers can't accidentally treat "1 of 313 batches succeeded" +// as a successful run. type Stats struct { - Scanned int64 - Embedded int64 - Added int64 - Wall time.Duration + Scanned int64 + Embedded int64 + Added int64 + Wall time.Duration + FailedBatches int64 } +// ErrPartialFailure signals that one or more batches errored during +// Run. Stats.FailedBatches has the count; the caller decides +// whether to retry / log / abort. Per 2026-04-29 cross-lineage +// scrum (Opus WARN): the original behavior returned nil even when +// 100% of batches failed silently, making "embedded=0/scanned=N" +// look like an empty corpus rather than a broken pipeline. +var ErrPartialFailure = errors.New("corpusingest: one or more batches failed") + // Run executes the ingest pipeline. Returns on source EOF after all // in-flight jobs drain, on context cancellation, or on the first // embed/add error (errors are logged via slog and the pipeline @@ -90,6 +103,7 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { var ( totalEmbedded int64 totalAdded int64 + failedBatches int64 ) var wg sync.WaitGroup @@ -102,10 +116,11 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { if err != nil { // Partial-failure semantics: log + continue. A wedged // embed batch shouldn't kill 8 workers' worth of - // progress; the operator decides whether to re-run - // based on the final Embedded vs Scanned delta. + // progress; Run returns ErrPartialFailure on any + // failure so callers can't miss the signal. slog.Warn("corpusingest: embed batch failed", "index", cfg.IndexName, "items", len(j.texts), "err", err) + atomic.AddInt64(&failedBatches, 1) continue } // Defense against a degraded embed backend that returns @@ -114,12 +129,14 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { if len(vecs) != len(j.ids) { slog.Warn("corpusingest: embed returned wrong count", "index", cfg.IndexName, "want", len(j.ids), "got", len(vecs)) + atomic.AddInt64(&failedBatches, 1) continue } atomic.AddInt64(&totalEmbedded, int64(len(vecs))) if err := addBatch(ctx, cfg, j.ids, vecs, j.metas); err != nil { slog.Warn("corpusingest: add batch failed", "index", cfg.IndexName, "items", len(j.ids), "err", err) + atomic.AddInt64(&failedBatches, 1) continue } atomic.AddInt64(&totalAdded, int64(len(j.ids))) @@ -159,14 +176,19 @@ func Run(ctx context.Context, cfg Config, src Source) (Stats, error) { <-progressDone stats := Stats{ - Scanned: scanned, - Embedded: atomic.LoadInt64(&totalEmbedded), - Added: atomic.LoadInt64(&totalAdded), - Wall: time.Since(t0), + Scanned: scanned, + Embedded: atomic.LoadInt64(&totalEmbedded), + Added: atomic.LoadInt64(&totalAdded), + Wall: time.Since(t0), + FailedBatches: atomic.LoadInt64(&failedBatches), } if err != nil { return stats, err } + if stats.FailedBatches > 0 { + return stats, fmt.Errorf("%w: %d batches failed (embedded=%d added=%d scanned=%d)", + ErrPartialFailure, stats.FailedBatches, stats.Embedded, stats.Added, stats.Scanned) + } return stats, nil } diff --git a/internal/corpusingest/ingest_test.go b/internal/corpusingest/ingest_test.go index 1732d24..b8295e8 100644 --- a/internal/corpusingest/ingest_test.go +++ b/internal/corpusingest/ingest_test.go @@ -3,6 +3,7 @@ package corpusingest import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -341,6 +342,47 @@ func TestRun_ProgressLoggerExits(t *testing.T) { } } +// TestRun_NonzeroFailedBatchesReturnsError guards the 2026-04-29 +// scrum WARN: original behavior returned nil even when 100% of +// batches failed, making "embedded=0/scanned=N" look like an empty +// corpus rather than a broken pipeline. +func TestRun_NonzeroFailedBatchesReturnsError(t *testing.T) { + // Fake gateway that fails every embed call. + mux := http.NewServeMux() + mux.HandleFunc("/v1/vectors/index", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + }) + mux.HandleFunc("/v1/embed", func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "embed failure injected", http.StatusBadGateway) + }) + mux.HandleFunc("/v1/vectors/index/", func(w http.ResponseWriter, r *http.Request) { + // shouldn't reach here since embed fails first + http.Error(w, "should not be called", http.StatusInternalServerError) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + rows := make([]Row, 5) + for i := range rows { + rows[i] = Row{ID: fmt.Sprintf("r-%d", i), Text: "x"} + } + + stats, err := Run(context.Background(), Config{ + GatewayURL: srv.URL, IndexName: "fail_only", Dimension: 4, + EmbedBatch: 1, EmbedWorkers: 1, HTTPClient: srv.Client(), + }, &staticSource{rows: rows}) + + if !errors.Is(err, ErrPartialFailure) { + t.Errorf("want ErrPartialFailure, got %v", err) + } + if stats.FailedBatches == 0 { + t.Error("FailedBatches should be > 0 when embeds fail") + } + if stats.Added != 0 { + t.Errorf("Added: want 0 (all failed), got %d", stats.Added) + } +} + func TestRun_RequiresIndexName(t *testing.T) { _, err := Run(context.Background(), Config{Dimension: 4}, &staticSource{rows: nil}) diff --git a/internal/matrix/retrieve.go b/internal/matrix/retrieve.go index a0b9504..96dfd2b 100644 --- a/internal/matrix/retrieve.go +++ b/internal/matrix/retrieve.go @@ -26,7 +26,6 @@ import ( "log/slog" "net/http" "sort" - "strings" "sync" "time" @@ -101,10 +100,11 @@ func New(embeddURL, vectordURL string) *Retriever { // Errors surfaced to HTTP handlers. var ( - ErrEmptyCorpora = errors.New("matrix: corpora must be non-empty") - ErrEmptyQuery = errors.New("matrix: query_text or query_vector required") - ErrCorpus = errors.New("matrix: corpus search failed") // wraps vectord errors - ErrEmbed = errors.New("matrix: embed failed") + ErrEmptyCorpora = errors.New("matrix: corpora must be non-empty") + ErrEmptyQuery = errors.New("matrix: query_text or query_vector required") + ErrCorpus = errors.New("matrix: corpus search failed") // wraps vectord errors + ErrEmbed = errors.New("matrix: embed failed") + ErrCorpusNotFound = errors.New("matrix: corpus not found") // distinct sentinel for vectord 404 ) // Search runs the matrix retrieve+merge. @@ -217,13 +217,12 @@ func (r *Retriever) fetchPlaybookHits(ctx context.Context, qvec []float32, req S } rawHits, err := r.searchCorpus(ctx, corpus, qvec, topK) + if errors.Is(err, ErrCorpusNotFound) { + // Cold-start state: no Record call has happened yet, so the + // playbook corpus doesn't exist. Legit no-op, not an error. + return nil, nil + } if err != nil { - // vectord returns 404 for missing index. We treat that as - // "no playbook yet" — legitimate cold-start state, not an - // error. - if strings.Contains(err.Error(), "status 404") { - return nil, nil - } return nil, err } @@ -414,7 +413,12 @@ func (r *Retriever) embed(ctx context.Context, text, model string) ([]float32, e return out.Vectors[0], nil } -// searchCorpus calls vectord /vectors/index/{name}/search. +// searchCorpus calls vectord /vectors/index/{name}/search. Returns +// ErrCorpusNotFound (wrapped) on HTTP 404 so callers can distinguish +// "this corpus doesn't exist" from "this corpus errored." Per +// 2026-04-29 cross-lineage scrum (Opus + Kimi convergent): caught +// the original strings.Contains "status 404" detection that would +// silently break if the error format changed. func (r *Retriever) searchCorpus(ctx context.Context, corpus string, vec []float32, k int) ([]vectord.Result, error) { body, err := json.Marshal(map[string]any{"vector": vec, "k": k}) if err != nil { @@ -431,6 +435,9 @@ func (r *Retriever) searchCorpus(ctx context.Context, corpus string, vec []float return nil, err } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("%w: %s", ErrCorpusNotFound, corpus) + } if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("status %d: %s", resp.StatusCode, b) diff --git a/internal/vectord/batch_bench_test.go b/internal/vectord/batch_bench_test.go index d42a474..4745c87 100644 --- a/internal/vectord/batch_bench_test.go +++ b/internal/vectord/batch_bench_test.go @@ -40,6 +40,41 @@ func BenchmarkBatchAdd(b *testing.B) { } } +// TestBatchAdd_IntraBatchDedup guards the 2026-04-29 scrum BLOCK: +// without dedup, coder/hnsw's "node not added" length-invariant +// panics when the same ID appears twice in one batch. Last-write- +// wins semantics; the second vector for a duplicate ID replaces the +// first. +func TestBatchAdd_IntraBatchDedup(t *testing.T) { + idx := mustIndex(t) + items := []BatchItem{ + {ID: "a", Vector: makeVec(768, 1)}, + {ID: "b", Vector: makeVec(768, 2)}, + {ID: "a", Vector: makeVec(768, 99)}, // duplicate — should win + } + if err := idx.BatchAdd(items); err != nil { + t.Fatalf("BatchAdd: %v", err) + } + if idx.Len() != 2 { + t.Errorf("Len: want 2, got %d", idx.Len()) + } + // "a" should hold the LATER vector (the 99 one), not the first. + v, _, ok := idx.Lookup("a") + if !ok { + t.Fatal("a not found") + } + if v[0] != 99 { + t.Errorf("last-write-wins: want vec[0]=99, got %v", v[0]) + } +} + +func makeVec(dim int, val float32) []float32 { + v := make([]float32, dim) + v[0] = val + v[1] = 1 // non-zero-norm under cosine + return v +} + func mustIndex(tb testing.TB) *Index { tb.Helper() idx, err := NewIndex(IndexParams{ diff --git a/internal/vectord/index.go b/internal/vectord/index.go index 30aeb34..e7f9dfd 100644 --- a/internal/vectord/index.go +++ b/internal/vectord/index.go @@ -245,15 +245,24 @@ type BatchItem struct { // error messages stay precise; reproducing it here would force // position-encoded errors on every consumer. // -// Intra-batch duplicate IDs are undefined behavior — coder/hnsw's -// internal "node not added" length-invariant fires on the second -// occurrence. Callers must de-dup before calling. The HTTP smoke -// uses unique IDs so this isn't an exercised path; documented for -// future callers. +// Intra-batch duplicate IDs: dedup'd internally with last-write-wins +// semantics (matches map-style behavior — second occurrence of an +// ID replaces the first). Without dedup, coder/hnsw's "node not +// added" length-invariant panics on the second occurrence. Caught +// by 2026-04-29 cross-lineage scrum (Opus BLOCK). func (i *Index) BatchAdd(items []BatchItem) error { if len(items) == 0 { return nil } + + // Intra-batch dedup, last-write-wins. Walk forward, record the + // LAST index for each ID, then keep only items whose index is + // the recorded last. Preserves order of last occurrences in the + // original positions. + if hasDup := containsDuplicateID(items); hasDup { + items = dedupBatchLastWins(items) + } + i.mu.Lock() defer i.mu.Unlock() @@ -285,6 +294,37 @@ func (i *Index) BatchAdd(items []BatchItem) error { return nil } +// containsDuplicateID is a fast pre-check — if no dups, skip the +// dedup allocation. Most batches won't have dups so this is a hot +// path. +func containsDuplicateID(items []BatchItem) bool { + seen := make(map[string]struct{}, len(items)) + for _, it := range items { + if _, ok := seen[it.ID]; ok { + return true + } + seen[it.ID] = struct{}{} + } + return false +} + +// dedupBatchLastWins keeps only the last occurrence of each ID, +// preserving the relative order of those last occurrences. This +// matches map-style "set X to A then to B" semantics: B wins. +func dedupBatchLastWins(items []BatchItem) []BatchItem { + lastIdx := make(map[string]int, len(items)) + for j, it := range items { + lastIdx[it.ID] = j + } + out := make([]BatchItem, 0, len(lastIdx)) + for j, it := range items { + if lastIdx[it.ID] == j { + out = append(out, it) + } + } + return out +} + // Delete removes id from the index. Returns true if present. func (i *Index) Delete(id string) bool { i.mu.Lock() diff --git a/scripts/staffing_500k/main.go b/scripts/staffing_500k/main.go index 848543b..c488037 100644 --- a/scripts/staffing_500k/main.go +++ b/scripts/staffing_500k/main.go @@ -16,9 +16,9 @@ import ( "context" "encoding/csv" "encoding/json" + "errors" "flag" "fmt" - "io" "log" "net/http" "os" @@ -133,10 +133,18 @@ func main() { LogProgress: 10 * time.Second, }, &workersCSV{cr: cr}) if err != nil { - log.Fatalf("ingest: %v", err) + // ErrPartialFailure means SOME batches failed but we still + // have a corpus to query. Report and continue rather than + // nuking the run for transient Ollama hiccups. + if errors.Is(err, corpusingest.ErrPartialFailure) { + fmt.Printf("[sc] WARN partial failure: %v\n", err) + } else { + log.Fatalf("ingest: %v", err) + } } - fmt.Printf("[sc] populate done: scanned=%d embedded=%d added=%d wall=%v\n", - stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) + fmt.Printf("[sc] populate done: scanned=%d embedded=%d added=%d failed=%d wall=%v\n", + stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches, + stats.Wall.Round(time.Millisecond)) } // Validate semantic queries against the populated index. @@ -211,7 +219,3 @@ func runQuery(hc *http.Client, gateway, q string) { } } -// io.EOF imported transitively via corpusingest; keep the explicit -// reference so a hypothetical future "EOF means done" check in this -// driver's Source impl doesn't need a fresh import line. -var _ = io.EOF diff --git a/scripts/staffing_candidates/main.go b/scripts/staffing_candidates/main.go index 48feb1f..5957a08 100644 --- a/scripts/staffing_candidates/main.go +++ b/scripts/staffing_candidates/main.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "flag" "fmt" "io" @@ -245,10 +246,15 @@ func main() { LogProgress: 5 * time.Second, }, src) if err != nil { - log.Fatalf("ingest: %v", err) + if errors.Is(err, corpusingest.ErrPartialFailure) { + fmt.Printf("[candidates] WARN partial failure: %v\n", err) + } else { + log.Fatalf("ingest: %v", err) + } } - fmt.Printf("[candidates] populate: scanned=%d embedded=%d added=%d wall=%v\n", - stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) + fmt.Printf("[candidates] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n", + stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches, + stats.Wall.Round(time.Millisecond)) } // Reality test — run a real staffing query through /v1/matrix/search diff --git a/scripts/staffing_workers/main.go b/scripts/staffing_workers/main.go index e0758a5..9f0c57d 100644 --- a/scripts/staffing_workers/main.go +++ b/scripts/staffing_workers/main.go @@ -19,6 +19,7 @@ package main import ( "context" + "errors" "flag" "fmt" "io" @@ -282,9 +283,14 @@ func main() { LogProgress: 10 * time.Second, }, src) if err != nil { - log.Fatalf("ingest: %v", err) + if errors.Is(err, corpusingest.ErrPartialFailure) { + fmt.Printf("[workers] WARN partial failure: %v\n", err) + } else { + log.Fatalf("ingest: %v", err) + } } - fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d wall=%v\n", - stats.Scanned, stats.Embedded, stats.Added, stats.Wall.Round(time.Millisecond)) + fmt.Printf("[workers] populate: scanned=%d embedded=%d added=%d failed=%d wall=%v\n", + stats.Scanned, stats.Embedded, stats.Added, stats.FailedBatches, + stats.Wall.Round(time.Millisecond)) }