migrate the strong-model auto-downgrade gate from a hardcoded weak list to cfg.Models.WeakModels. backward compatible: existing API preserved, callers that don't migrate keep using DefaultWeakModels. changes: - internal/matrix/downgrade.go: split IsWeakModel into rule-based base (`:free` suffix/infix) + literal-list lookup. New IsWeakModelInList(model, list) takes the config-supplied list. DowngradeInput grows a WeakModels field; nil falls back to DefaultWeakModels (preserves pre-phase-2 behavior). - internal/workflow/modes.go: add MatrixDowngradeWithWeakList(list) factory mirroring MatrixSearch's pattern. Plain MatrixDowngrade kept for backward compat. - cmd/matrixd/main.go: handlers struct holds weakModels populated from cfg.Models.WeakModels at startup; handleDowngrade threads it into every DowngradeInput. - cmd/observerd/main.go: registerBuiltinModes accepts weakModels and uses the factory variant. observerd reads cfg.Models.WeakModels in main(). end-to-end verified: downgrade + matrix + observer + workflow smokes all pass. Existing TestMaybeDowngrade_TruthTable + TestIsWeakModel unchanged (backward compat). Two new tests cover the config path: - TestIsWeakModelInList — covers rule + literal + empty + nil - TestMaybeDowngrade_WithConfigList — verifies cfg list overrides default Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
302 lines
10 KiB
Go
302 lines
10 KiB
Go
// matrixd is the matrix indexer service. Wraps internal/matrix's
|
|
// Retriever with HTTP routes per docs/SPEC.md §3.4.
|
|
//
|
|
// Routes:
|
|
// POST /matrix/search — multi-corpus retrieve+merge,
|
|
// with optional playbook boost
|
|
// GET /matrix/corpora — list known vectord indexes
|
|
// POST /matrix/relevance — adjacency-pollution filter
|
|
// POST /matrix/downgrade — strong-model downgrade gate
|
|
// POST /matrix/playbooks/record — record a single (query → answer)
|
|
// success for the learning loop
|
|
// POST /matrix/playbooks/bulk — bulk-record N successes; useful
|
|
// for backfilling historical
|
|
// placement data into the
|
|
// playbook substrate
|
|
//
|
|
// matrixd talks to embedd (for query-text embedding) and vectord
|
|
// (for per-corpus search) via HTTP. Both URLs come from
|
|
// [matrixd] config; gateway sets them to its own upstream URLs so
|
|
// matrixd inherits the same provider topology.
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/matrix"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
)
|
|
|
|
const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
|
flag.Parse()
|
|
|
|
cfg, err := shared.LoadConfig(*configPath)
|
|
if err != nil {
|
|
slog.Error("config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
if cfg.Matrixd.EmbeddURL == "" || cfg.Matrixd.VectordURL == "" {
|
|
slog.Error("matrixd: embedd_url and vectord_url required in [matrixd]")
|
|
os.Exit(1)
|
|
}
|
|
|
|
retriever := matrix.New(cfg.Matrixd.EmbeddURL, cfg.Matrixd.VectordURL)
|
|
h := &handlers{r: retriever, weakModels: cfg.Models.WeakModels}
|
|
|
|
if err := shared.Run("matrixd", cfg.Matrixd.Bind, h.register, cfg.Auth); err != nil {
|
|
slog.Error("server", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
type handlers struct {
|
|
r *matrix.Retriever
|
|
// weakModels comes from cfg.Models.WeakModels at startup. Threaded
|
|
// into every DowngradeInput so the gate uses the configured list
|
|
// instead of matrix.DefaultWeakModels. nil/empty falls back to
|
|
// the package default — matches pre-Phase 2 behavior.
|
|
weakModels []string
|
|
}
|
|
|
|
func (h *handlers) register(r chi.Router) {
|
|
r.Post("/matrix/search", h.handleSearch)
|
|
r.Get("/matrix/corpora", h.handleCorpora)
|
|
r.Post("/matrix/relevance", h.handleRelevance)
|
|
r.Post("/matrix/downgrade", h.handleDowngrade)
|
|
r.Post("/matrix/playbooks/record", h.handlePlaybookRecord)
|
|
r.Post("/matrix/playbooks/bulk", h.handlePlaybookBulk)
|
|
}
|
|
|
|
func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) {
|
|
var req matrix.SearchRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
resp, err := h.r.Search(r.Context(), req)
|
|
if err != nil {
|
|
writeMatrixError(w, err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// relevanceRequest is the POST /matrix/relevance body. Threshold
|
|
// defaults to matrix.DefaultRelevanceThreshold when zero.
|
|
type relevanceRequest struct {
|
|
Focus matrix.FocusFile `json:"focus"`
|
|
Chunks []matrix.CandidateChunk `json:"chunks"`
|
|
Threshold float64 `json:"threshold,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handleRelevance(w http.ResponseWriter, r *http.Request) {
|
|
var req relevanceRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
if len(req.Chunks) == 0 {
|
|
http.Error(w, "chunks must be non-empty", http.StatusBadRequest)
|
|
return
|
|
}
|
|
threshold := req.Threshold
|
|
if threshold == 0 {
|
|
threshold = matrix.DefaultRelevanceThreshold
|
|
}
|
|
res := matrix.FilterChunks(req.Focus, req.Chunks, threshold)
|
|
writeJSON(w, http.StatusOK, res)
|
|
}
|
|
|
|
// playbookRecordRequest is the POST /matrix/playbooks/record body.
|
|
// Corpus is optional; defaults to matrix.DefaultPlaybookCorpus.
|
|
type playbookRecordRequest struct {
|
|
QueryText string `json:"query_text"`
|
|
AnswerID string `json:"answer_id"`
|
|
AnswerCorpus string `json:"answer_corpus"`
|
|
Score float64 `json:"score"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Corpus string `json:"corpus,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handlePlaybookRecord(w http.ResponseWriter, r *http.Request) {
|
|
var req playbookRecordRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
entry := matrix.NewPlaybookEntry(req.QueryText, req.AnswerID, req.AnswerCorpus, req.Score, req.Tags)
|
|
if err := entry.Validate(); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
pbID, err := h.r.Record(r.Context(), entry, req.Corpus)
|
|
if err != nil {
|
|
slog.Warn("playbook record", "err", err)
|
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"playbook_id": pbID,
|
|
"query_text": entry.QueryText,
|
|
"answer_id": entry.AnswerID,
|
|
"answer_corpus": entry.AnswerCorpus,
|
|
"score": entry.Score,
|
|
})
|
|
}
|
|
|
|
// playbookBulkRequest is the POST /matrix/playbooks/bulk body —
|
|
// component C (operational rating wiring). Used to backfill
|
|
// historical placement data, or batch-record a session's worth of
|
|
// coordinator click-tracking. Each Entry is recorded independently;
|
|
// failures are reported per-entry without aborting the batch.
|
|
type playbookBulkRequest struct {
|
|
Entries []playbookRecordRequest `json:"entries"`
|
|
Corpus string `json:"corpus,omitempty"` // applies to all if entry-level not set
|
|
}
|
|
|
|
// playbookBulkResult reports per-entry outcomes plus the aggregate
|
|
// count. Errors include the entry index so callers can locate the
|
|
// offending record without diffing.
|
|
type playbookBulkResult struct {
|
|
Recorded int `json:"recorded"`
|
|
Failed int `json:"failed"`
|
|
Results []playbookBulkItemResult `json:"results"`
|
|
}
|
|
|
|
type playbookBulkItemResult struct {
|
|
Index int `json:"index"`
|
|
PlaybookID string `json:"playbook_id,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handlePlaybookBulk(w http.ResponseWriter, r *http.Request) {
|
|
var req playbookBulkRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
if len(req.Entries) == 0 {
|
|
http.Error(w, "entries must be non-empty", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
out := playbookBulkResult{
|
|
Results: make([]playbookBulkItemResult, len(req.Entries)),
|
|
}
|
|
for i, item := range req.Entries {
|
|
corpus := item.Corpus
|
|
if corpus == "" {
|
|
corpus = req.Corpus
|
|
}
|
|
entry := matrix.NewPlaybookEntry(item.QueryText, item.AnswerID, item.AnswerCorpus, item.Score, item.Tags)
|
|
if err := entry.Validate(); err != nil {
|
|
out.Results[i] = playbookBulkItemResult{Index: i, Error: err.Error()}
|
|
out.Failed++
|
|
continue
|
|
}
|
|
pbID, err := h.r.Record(r.Context(), entry, corpus)
|
|
if err != nil {
|
|
out.Results[i] = playbookBulkItemResult{Index: i, Error: err.Error()}
|
|
out.Failed++
|
|
continue
|
|
}
|
|
out.Results[i] = playbookBulkItemResult{Index: i, PlaybookID: pbID}
|
|
out.Recorded++
|
|
}
|
|
writeJSON(w, http.StatusOK, out)
|
|
}
|
|
|
|
// downgradeRequest is the POST /matrix/downgrade body. Mirrors
|
|
// matrix.DowngradeInput. When ForceFullOverride is omitted from
|
|
// the body, the value falls back to matrixd's process env
|
|
// (LH_FORCE_FULL_ENRICHMENT) — an opinionated default that lets
|
|
// operators set the env var on the matrixd unit and have every
|
|
// gate decision honor it without per-request changes. Per
|
|
// 2026-04-29 cross-lineage scrum (Opus WARN): callers that want
|
|
// deterministic gate behavior independent of matrixd's env should
|
|
// pass ForceFullOverride explicitly in the body.
|
|
type downgradeRequest struct {
|
|
Mode string `json:"mode"`
|
|
Model string `json:"model"`
|
|
ForcedMode bool `json:"forced_mode,omitempty"`
|
|
ForceFullOverride *bool `json:"force_full_override,omitempty"`
|
|
}
|
|
|
|
func (h *handlers) handleDowngrade(w http.ResponseWriter, r *http.Request) {
|
|
var req downgradeRequest
|
|
if !decodeJSON(w, r, &req) {
|
|
return
|
|
}
|
|
if req.Mode == "" || req.Model == "" {
|
|
http.Error(w, "mode and model are required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
in := matrix.NewDowngradeInputFromEnv(req.Mode, req.Model, req.ForcedMode)
|
|
if req.ForceFullOverride != nil {
|
|
// Explicit body override beats env, useful for tooling that
|
|
// wants to ask "what would the gate do under these conditions"
|
|
// without env pollution.
|
|
in.ForceFullOverride = *req.ForceFullOverride
|
|
}
|
|
in.WeakModels = h.weakModels
|
|
writeJSON(w, http.StatusOK, matrix.MaybeDowngrade(in))
|
|
}
|
|
|
|
func (h *handlers) handleCorpora(w http.ResponseWriter, r *http.Request) {
|
|
names, err := h.r.Corpora(r.Context())
|
|
if err != nil {
|
|
slog.Error("matrix corpora", "err", err)
|
|
http.Error(w, "vectord unavailable", http.StatusBadGateway)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"corpora": names, "count": len(names)})
|
|
}
|
|
|
|
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
|
|
defer r.Body.Close()
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
|
|
if err := json.NewDecoder(r.Body).Decode(v); err != nil {
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
|
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
|
|
return false
|
|
}
|
|
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, code int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(code)
|
|
if err := json.NewEncoder(w).Encode(v); err != nil {
|
|
slog.Warn("matrix write json", "err", err)
|
|
}
|
|
}
|
|
|
|
// writeMatrixError maps internal/matrix sentinels to HTTP statuses.
|
|
// Corpus / embed failures bubble up as 502 (the upstream service is
|
|
// what's wrong); validation errors are 400.
|
|
func writeMatrixError(w http.ResponseWriter, err error) {
|
|
switch {
|
|
case errors.Is(err, matrix.ErrEmptyCorpora),
|
|
errors.Is(err, matrix.ErrEmptyQuery):
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
case errors.Is(err, matrix.ErrCorpus),
|
|
errors.Is(err, matrix.ErrEmbed):
|
|
slog.Warn("matrix upstream", "err", err)
|
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
default:
|
|
slog.Error("matrix", "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
}
|
|
}
|