Cross-lineage scrum review on the 12 commits of this session
(afbb506..06e7152) via Rust gateway :3100 with Opus + Kimi +
Qwen3-coder. Results:
Real findings landed:
1. Opus BLOCK — vectord BatchAdd intra-batch duplicates panic
coder/hnsw's "node not added" length-invariant. Fixed with
last-write-wins dedup inside BatchAdd before the pre-pass.
Regression test TestBatchAdd_IntraBatchDedup added.
2. Opus + Kimi convergent WARN — strings.Contains(err.Error(),
"status 404") was brittle string-matching to detect cold-
start playbook state. Fixed: ErrCorpusNotFound sentinel
returned by searchCorpus on HTTP 404; fetchPlaybookHits
uses errors.Is.
3. Opus WARN — corpusingest.Run returned nil on total batch
failure, masking broken pipelines as "empty corpora." Fixed:
Stats.FailedBatches counter, ErrPartialFailure sentinel
returned when nonzero. New regression test
TestRun_NonzeroFailedBatchesReturnsError.
4. Opus WARN — dead var _ = io.EOF in staffing_500k/main.go
was justified by a fictional comment. Removed.
Drivers (staffing_500k, staffing_candidates, staffing_workers)
updated to handle ErrPartialFailure gracefully — print warn, keep
running queries — rather than fatal'ing on transient hiccups
while still surfacing the failure clearly in the output.
Documented (no code change):
- Opus WARN: matrixd /matrix/downgrade reads
LH_FORCE_FULL_ENRICHMENT from process env when body omits
it. Comment now explains the opinionated default and points
callers wanting deterministic behavior to pass the field
explicitly.
False positives dismissed (caught and verified, NOT acted on):
A. Kimi BLOCK on errors.Is + wrapped error in cmd/matrixd:223.
Verified false: Search wraps with %w (fmt.Errorf("%w: %v",
ErrEmbed, err)), so errors.Is matches the chain correctly.
B. Kimi INFO "BatchAdd has no unit tests." Verified false:
batch_bench_test.go has BenchmarkBatchAdd; the new dedup
test TestBatchAdd_IntraBatchDedup adds another.
C. Opus BLOCK on missing finite/zero-norm pre-validation in
cmd/vectord:280-291. Verified false: line 272 already calls
vectord.ValidateVector before BatchAdd, so finite + zero-
norm IS checked. Pre-validation is exhaustive.
D. Opus WARN on relevance.go tokenRe (Opus self-corrected
mid-finding when realizing leading char counts toward token
length).
Qwen3-coder returned NO FINDINGS — known issue with very long
diffs through the OpenRouter free tier; lineage rotation worked
as designed (Opus + Kimi between them caught everything Qwen
would have).
15-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix, relevance, downgrade, playbook).
Unit tests all green (corpusingest +1, vectord +1).
Per feedback_cross_lineage_review.md: convergent finding #2 (404
detection) is the highest-signal one — both Opus and Kimi
flagged it independently. The other Opus findings stand on
single-reviewer signal but each one verified against the actual
code.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
230 lines
7.4 KiB
Go
230 lines
7.4 KiB
Go
// matrixd is the matrix indexer service. Wraps internal/matrix's
|
|
// Retriever with HTTP routes per docs/SPEC.md §3.4.
|
|
//
|
|
// Routes:
|
|
// POST /matrix/search — multi-corpus retrieve+merge,
|
|
// with optional playbook boost
|
|
// GET /matrix/corpora — list known vectord indexes
|
|
// POST /matrix/relevance — adjacency-pollution filter
|
|
// POST /matrix/downgrade — strong-model downgrade gate
|
|
// POST /matrix/playbooks/record — record a (query → answer)
|
|
// success for the learning loop
|
|
//
|
|
// matrixd talks to embedd (for query-text embedding) and vectord
|
|
// (for per-corpus search) via HTTP. Both URLs come from
|
|
// [matrixd] config; gateway sets them to its own upstream URLs so
|
|
// matrixd inherits the same provider topology.
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/matrix"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
)
|
|
|
|
const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
|
flag.Parse()
|
|
|
|
cfg, err := shared.LoadConfig(*configPath)
|
|
if err != nil {
|
|
slog.Error("config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
if cfg.Matrixd.EmbeddURL == "" || cfg.Matrixd.VectordURL == "" {
|
|
slog.Error("matrixd: embedd_url and vectord_url required in [matrixd]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
retriever := matrix.New(cfg.Matrixd.EmbeddURL, cfg.Matrixd.VectordURL)
|
|
h := &handlers{r: retriever}
|
|
|
|
if err := shared.Run("matrixd", cfg.Matrixd.Bind, h.register, cfg.Auth); err != nil {
|
|
slog.Error("server", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
type handlers struct {
|
|
r *matrix.Retriever
|
|
}
|
|
|
|
func (h *handlers) register(r chi.Router) {
|
|
r.Post("/matrix/search", h.handleSearch)
|
|
r.Get("/matrix/corpora", h.handleCorpora)
|
|
r.Post("/matrix/relevance", h.handleRelevance)
|
|
r.Post("/matrix/downgrade", h.handleDowngrade)
|
|
r.Post("/matrix/playbooks/record", h.handlePlaybookRecord)
|
|
}
|
|
|
|
func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) {
|
|
var req matrix.SearchRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
resp, err := h.r.Search(r.Context(), req)
|
|
if err != nil {
|
|
writeMatrixError(w, err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// relevanceRequest is the POST /matrix/relevance body. Threshold
|
|
// defaults to matrix.DefaultRelevanceThreshold when zero.
|
|
type relevanceRequest struct {
|
|
Focus matrix.FocusFile `json:"focus"`
|
|
Chunks []matrix.CandidateChunk `json:"chunks"`
|
|
Threshold float64 `json:"threshold,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handleRelevance(w http.ResponseWriter, r *http.Request) {
|
|
var req relevanceRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
if len(req.Chunks) == 0 {
|
|
http.Error(w, "chunks must be non-empty", http.StatusBadRequest)
|
|
return
|
|
}
|
|
threshold := req.Threshold
|
|
if threshold == 0 {
|
|
threshold = matrix.DefaultRelevanceThreshold
|
|
}
|
|
res := matrix.FilterChunks(req.Focus, req.Chunks, threshold)
|
|
writeJSON(w, http.StatusOK, res)
|
|
}
|
|
|
|
// playbookRecordRequest is the POST /matrix/playbooks/record body.
|
|
// Corpus is optional; defaults to matrix.DefaultPlaybookCorpus.
|
|
type playbookRecordRequest struct {
|
|
QueryText string `json:"query_text"`
|
|
AnswerID string `json:"answer_id"`
|
|
AnswerCorpus string `json:"answer_corpus"`
|
|
Score float64 `json:"score"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Corpus string `json:"corpus,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handlePlaybookRecord(w http.ResponseWriter, r *http.Request) {
|
|
var req playbookRecordRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
entry := matrix.NewPlaybookEntry(req.QueryText, req.AnswerID, req.AnswerCorpus, req.Score, req.Tags)
|
|
if err := entry.Validate(); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
pbID, err := h.r.Record(r.Context(), entry, req.Corpus)
|
|
if err != nil {
|
|
slog.Warn("playbook record", "err", err)
|
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"playbook_id": pbID,
|
|
"query_text": entry.QueryText,
|
|
"answer_id": entry.AnswerID,
|
|
"answer_corpus": entry.AnswerCorpus,
|
|
"score": entry.Score,
|
|
})
|
|
}
|
|
|
|
// downgradeRequest is the POST /matrix/downgrade body. Mirrors
|
|
// matrix.DowngradeInput. When ForceFullOverride is omitted from
|
|
// the body, the value falls back to matrixd's process env
|
|
// (LH_FORCE_FULL_ENRICHMENT) — an opinionated default that lets
|
|
// operators set the env var on the matrixd unit and have every
|
|
// gate decision honor it without per-request changes. Per
|
|
// 2026-04-29 cross-lineage scrum (Opus WARN): callers that want
|
|
// deterministic gate behavior independent of matrixd's env should
|
|
// pass ForceFullOverride explicitly in the body.
|
|
type downgradeRequest struct {
|
|
Mode string `json:"mode"`
|
|
Model string `json:"model"`
|
|
ForcedMode bool `json:"forced_mode,omitempty"`
|
|
ForceFullOverride *bool `json:"force_full_override,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handleDowngrade(w http.ResponseWriter, r *http.Request) {
|
|
var req downgradeRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
if req.Mode == "" || req.Model == "" {
|
|
http.Error(w, "mode and model are required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
in := matrix.NewDowngradeInputFromEnv(req.Mode, req.Model, req.ForcedMode)
|
|
if req.ForceFullOverride != nil {
|
|
// Explicit body override beats env, useful for tooling that
|
|
// wants to ask "what would the gate do under these conditions"
|
|
// without env pollution.
|
|
in.ForceFullOverride = *req.ForceFullOverride
|
|
}
|
|
writeJSON(w, http.StatusOK, matrix.MaybeDowngrade(in))
|
|
}
|
|
|
|
func (h *handlers) handleCorpora(w http.ResponseWriter, r *http.Request) {
|
|
names, err := h.r.Corpora(r.Context())
|
|
if err != nil {
|
|
slog.Error("matrix corpora", "err", err)
|
|
http.Error(w, "vectord unavailable", http.StatusBadGateway)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"corpora": names, "count": len(names)})
|
|
}
|
|
|
|
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
|
|
defer r.Body.Close()
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
|
|
if err := json.NewDecoder(r.Body).Decode(v); err != nil {
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
|
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
|
|
return false
|
|
}
|
|
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, code int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(code)
|
|
if err := json.NewEncoder(w).Encode(v); err != nil {
|
|
slog.Warn("matrix write json", "err", err)
|
|
}
|
|
}
|
|
|
|
// writeMatrixError maps internal/matrix sentinels to HTTP statuses.
|
|
// Corpus / embed failures bubble up as 502 (the upstream service is
|
|
// what's wrong); validation errors are 400.
|
|
func writeMatrixError(w http.ResponseWriter, err error) {
|
|
switch {
|
|
case errors.Is(err, matrix.ErrEmptyCorpora),
|
|
errors.Is(err, matrix.ErrEmptyQuery):
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
case errors.Is(err, matrix.ErrCorpus),
|
|
errors.Is(err, matrix.ErrEmbed):
|
|
slog.Warn("matrix upstream", "err", err)
|
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
default:
|
|
slog.Error("matrix", "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
}
|
|
}
|