root 06e71520c4 matrix: playbook memory + boost — SPEC §3.4 component 5 of 5 (LEARNING LOOP)
Closes SPEC §3.4. The matrix indexer is now a learning meta-index per
feedback_meta_index_vision.md — every successful (query → answer)
pair recorded via /matrix/playbooks/record boosts that answer for
future similar queries.

This is the architectural piece that lifts vectord from "static
hybrid search" to the meta-index J originally framed in Phase 19 of
the Rust system.

What's new:
  - internal/matrix/playbook.go — PlaybookEntry, PlaybookHit,
    ApplyPlaybookBoost. Pure-function boost math:
      distance' = distance * (1 - 0.5 * score)
    Score 0 = no boost (factor 1.0); score 1 = halve distance
    (factor 0.5). Capped at 0.5 deliberately so a single high-
    confidence playbook can't dominate the base ranking forever
    (runaway-feedback-loop guard).
  - Retriever.Record(entry, corpus) — embeds query_text, ensures
    playbook corpus exists (idempotent), upserts via deterministic
    sha256-derived ID (last score wins on re-record of same triple).
  - Retriever.Search extended with UsePlaybook + PlaybookCorpus +
    PlaybookTopK + PlaybookMaxDistance. Reuses the query vector —
    no extra embed call. Missing-corpus 404 = no-op (cold-start
    state before any Record call), not an error.
  - POST /v1/matrix/playbooks/record (matrixd) — caller submits
    {query_text, answer_id, answer_corpus, score, tags?}; gets
    {playbook_id} back.

Storage: a vectord index named "playbook_memory" (configurable per
request) with embed(query_text) as the vector and the
PlaybookEntry JSON as metadata. Just another corpus — observable
from /vectors/index, persistable through G1P, etc.

Match key for boost: (AnswerID, AnswerCorpus). Cross-corpus ID
collisions don't false-match — verified by
TestApplyPlaybookBoost_CorpusAttributionRespected.

End-to-end smoke (scripts/playbook_smoke.sh, all assertions PASS):
  - Baseline search: widget-c at distance 0.6566 (rank 3)
  - Record playbook: query → widget-c, score=1.0
  - Re-search with use_playbook=true:
      widget-c distance: 0.3283 (rank 2)
      ratio: 0.5 EXACTLY (matches boost math precisely)
      playbook_boosted: 1
  - widget-c jumped from #3 to #2 — learning loop visible

Tests:
  - 8 unit tests in internal/matrix/playbook_test.go covering
    Validate, BoostFactor (5 cases), the no-boost identity, the
    boost-moves-result-up scenario, highest-score wins on duplicate
    matches, cross-corpus attribution, JSON round-trip, and
    rejection of empty metadata
  - scripts/playbook_smoke.sh integration test (3 assertions PASS)

15-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix, relevance, downgrade, playbook).

SPEC §3.4 NOW COMPLETE: 5 of 5 components shipped. The matrix
indexer's port is done as a substrate; remaining work is operational
(rating signal sources, telemetry, eventual structured filtering for
staffing data — none in §3.4).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 19:34:24 -05:00

446 lines
14 KiB
Go

// Package matrix is the multi-corpus retrieval layer above vectord.
// Per docs/SPEC.md §3.4: the matrix indexer composes N single-corpus
// vectord indexes into one retrieve+merge surface, with corpus
// attribution preserved per result. Future work in the same package:
// relevance filter, strong-model downgrade gate, learning-loop
// integration. This file is component 2 of the dependency-ordered
// port plan — multi-corpus retrieve+merge, no filter yet.
//
// Why corpus-as-shard rather than hash-shard a single index:
// different corpora have distinct topology and distinct retrieval
// intent (workers vs candidates vs scrum_findings vs lakehouse_arch).
// Multi-corpus search merges across them by distance — that IS the
// matrix indexer's whole purpose. See feedback_meta_index_vision.md
// and project_small_model_pipeline_vision.md.
package matrix
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"sort"
"strings"
"sync"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord"
)
// Result is one merged hit with corpus attribution. The corpus field
// is load-bearing — losing it would defeat the matrix's purpose
// (knowing WHICH corpus contributed each hit is half the signal).
type Result struct {
ID string `json:"id"`
Distance float32 `json:"distance"`
Corpus string `json:"corpus"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
// SearchRequest is the matrix search input. Either QueryText (matrix
// embeds it via embedd) or QueryVector (already embedded by caller)
// must be set; QueryVector takes precedence if both supplied.
//
// Playbook fields (component 5 — learning loop):
// UsePlaybook=true: after normal retrieve+merge, fetch top similar
// past queries from PlaybookCorpus and apply distance boost to
// any current results that match a recorded answer.
// PlaybookCorpus: index name; empty = DefaultPlaybookCorpus.
// PlaybookTopK: number of similar past queries to consider; 0 =
// DefaultPlaybookTopK.
// PlaybookMaxDistance: cosine ceiling for "similar enough"; 0 =
// DefaultPlaybookMaxDistance.
type SearchRequest struct {
QueryText string `json:"query_text,omitempty"`
QueryVector []float32 `json:"query_vector,omitempty"`
Corpora []string `json:"corpora"`
K int `json:"k"`
PerCorpusK int `json:"per_corpus_k,omitempty"`
Model string `json:"model,omitempty"`
UsePlaybook bool `json:"use_playbook,omitempty"`
PlaybookCorpus string `json:"playbook_corpus,omitempty"`
PlaybookTopK int `json:"playbook_top_k,omitempty"`
PlaybookMaxDistance float64 `json:"playbook_max_distance,omitempty"`
}
// SearchResponse wraps the merged results plus per-corpus return
// counts so callers can detect "this corpus returned nothing"
// without re-querying. PlaybookBoosted is the count of results that
// received a boost from playbook memory; useful for telemetry on
// "how much the learning loop influenced this query."
type SearchResponse struct {
Results []Result `json:"results"`
PerCorpusCounts map[string]int `json:"per_corpus_counts"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
}
// Retriever holds the HTTP clients to embedd and vectord. Stateless
// otherwise — safe to share across goroutines.
type Retriever struct {
httpClient *http.Client
embeddURL string
vectordURL string
}
// New returns a Retriever configured to call embedd at embeddURL
// and vectord at vectordURL (both gateway-internal upstreams,
// usually 127.0.0.1:3216 and :3215 respectively).
func New(embeddURL, vectordURL string) *Retriever {
return &Retriever{
httpClient: &http.Client{Timeout: 30 * time.Second},
embeddURL: embeddURL,
vectordURL: vectordURL,
}
}
// Errors surfaced to HTTP handlers.
var (
ErrEmptyCorpora = errors.New("matrix: corpora must be non-empty")
ErrEmptyQuery = errors.New("matrix: query_text or query_vector required")
ErrCorpus = errors.New("matrix: corpus search failed") // wraps vectord errors
ErrEmbed = errors.New("matrix: embed failed")
)
// Search runs the matrix retrieve+merge.
//
// Error policy: fail-loud on any corpus error. Silent partial results
// would lie about what was actually searched, which defeats the
// indexer's coverage guarantee. Callers that want best-effort can
// catch the error and re-issue with a smaller corpora list.
func (r *Retriever) Search(ctx context.Context, req SearchRequest) (*SearchResponse, error) {
if len(req.Corpora) == 0 {
return nil, ErrEmptyCorpora
}
if req.K <= 0 {
return nil, errors.New("matrix: k must be > 0")
}
if req.PerCorpusK <= 0 {
req.PerCorpusK = req.K
}
// Resolve query → vector.
qvec := req.QueryVector
if len(qvec) == 0 {
if req.QueryText == "" {
return nil, ErrEmptyQuery
}
v, err := r.embed(ctx, req.QueryText, req.Model)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEmbed, err)
}
qvec = v
}
// Parallel search across corpora. Each shard is independent;
// fan-out + collect with WaitGroup is cleaner than channels-only.
type shardResult struct {
corpus string
hits []vectord.Result
err error
}
results := make([]shardResult, len(req.Corpora))
var wg sync.WaitGroup
for i, c := range req.Corpora {
wg.Add(1)
go func(i int, corpus string) {
defer wg.Done()
hits, err := r.searchCorpus(ctx, corpus, qvec, req.PerCorpusK)
results[i] = shardResult{corpus: corpus, hits: hits, err: err}
}(i, c)
}
wg.Wait()
var allHits []Result
perCorpus := make(map[string]int, len(req.Corpora))
for _, s := range results {
if s.err != nil {
return nil, fmt.Errorf("%w: %s: %v", ErrCorpus, s.corpus, s.err)
}
perCorpus[s.corpus] = len(s.hits)
for _, h := range s.hits {
allHits = append(allHits, Result{
ID: h.ID, Distance: h.Distance, Corpus: s.corpus, Metadata: h.Metadata,
})
}
}
// Stable sort so equal-distance ties keep input order (which is
// per-corpus order from vectord's HNSW result heap). This matters
// for deterministic test assertions.
sort.SliceStable(allHits, func(i, j int) bool {
return allHits[i].Distance < allHits[j].Distance
})
if len(allHits) > req.K {
allHits = allHits[:req.K]
}
resp := &SearchResponse{Results: allHits, PerCorpusCounts: perCorpus}
// Playbook boost (component 5). Reuses the query vector — no
// extra embed call. If the playbook corpus doesn't exist (first
// search before any Record), the lookup gracefully no-ops.
if req.UsePlaybook {
hits, err := r.fetchPlaybookHits(ctx, qvec, req)
if err != nil {
// Don't fail the whole search on playbook errors — the
// boost is opportunistic. Log + continue.
slog.Warn("matrix: playbook lookup failed; skipping boost", "err", err)
} else if len(hits) > 0 {
resp.PlaybookBoosted = ApplyPlaybookBoost(resp.Results, hits)
}
}
return resp, nil
}
// fetchPlaybookHits queries the playbook corpus with the same query
// vector and returns hits whose decoded entries are within
// PlaybookMaxDistance. A missing playbook corpus returns nil + nil
// (legitimate no-op state for a system before any Record call).
func (r *Retriever) fetchPlaybookHits(ctx context.Context, qvec []float32, req SearchRequest) ([]PlaybookHit, error) {
corpus := req.PlaybookCorpus
if corpus == "" {
corpus = DefaultPlaybookCorpus
}
topK := req.PlaybookTopK
if topK <= 0 {
topK = DefaultPlaybookTopK
}
maxDist := req.PlaybookMaxDistance
if maxDist <= 0 {
maxDist = DefaultPlaybookMaxDistance
}
rawHits, err := r.searchCorpus(ctx, corpus, qvec, topK)
if err != nil {
// vectord returns 404 for missing index. We treat that as
// "no playbook yet" — legitimate cold-start state, not an
// error.
if strings.Contains(err.Error(), "status 404") {
return nil, nil
}
return nil, err
}
out := make([]PlaybookHit, 0, len(rawHits))
for _, h := range rawHits {
if float64(h.Distance) > maxDist {
continue
}
entry, err := UnmarshalPlaybookMetadata(h.Metadata)
if err != nil {
slog.Warn("matrix: skip malformed playbook entry", "id", h.ID, "err", err)
continue
}
out = append(out, PlaybookHit{
PlaybookID: h.ID,
Distance: h.Distance,
Entry: entry,
})
}
return out, nil
}
// Record stores a (query → answer_id) playbook entry in the
// playbook corpus. Embeds the query via embedd, ensures the corpus
// exists (idempotent create), and writes the entry as one vectord
// item with the entry's JSON in metadata.
//
// Uses a deterministic ID derived from (query_text, answer_id,
// answer_corpus) so re-recording the same triple upserts (last
// score wins). Callers wanting to accumulate distinct samples can
// vary one of the three.
//
// corpus="" defaults to DefaultPlaybookCorpus.
func (r *Retriever) Record(ctx context.Context, entry PlaybookEntry, corpus string) (string, error) {
if err := entry.Validate(); err != nil {
return "", err
}
if corpus == "" {
corpus = DefaultPlaybookCorpus
}
qvec, err := r.embed(ctx, entry.QueryText, "")
if err != nil {
return "", fmt.Errorf("playbook record embed: %w", err)
}
if err := r.ensureCorpus(ctx, corpus, len(qvec)); err != nil {
return "", fmt.Errorf("playbook ensure corpus: %w", err)
}
if entry.RecordedAtNs == 0 {
entry.RecordedAtNs = time.Now().UnixNano()
}
pbID := playbookID(entry.QueryText, entry.AnswerID, entry.AnswerCorpus)
meta, err := entry.MarshalMetadata()
if err != nil {
return "", err
}
if err := r.addItem(ctx, corpus, pbID, qvec, meta); err != nil {
return "", fmt.Errorf("playbook add: %w", err)
}
return pbID, nil
}
// playbookID is sha256-truncated 8 bytes (16 hex chars) prefixed
// with "pb-". Deterministic on (query, answer_id, answer_corpus).
func playbookID(query, answerID, answerCorpus string) string {
h := sha256.Sum256([]byte(query + "|" + answerID + "|" + answerCorpus))
return "pb-" + hex.EncodeToString(h[:8])
}
// ensureCorpus creates a vectord index if it doesn't exist.
// 201 = created; 409 = already exists; both fine for idempotent use.
func (r *Retriever) ensureCorpus(ctx context.Context, name string, dim int) error {
body, err := json.Marshal(map[string]any{
"name": name, "dimension": dim, "distance": "cosine",
})
if err != nil {
return err
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
r.vectordURL+"/vectors/index", bytes.NewReader(body))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusConflict {
return nil
}
return fmt.Errorf("ensure %q: status %d", name, resp.StatusCode)
}
// addItem POSTs a single-item batch to /vectors/index/{name}/add.
func (r *Retriever) addItem(ctx context.Context, corpus, id string, vec []float32, meta json.RawMessage) error {
body, err := json.Marshal(map[string]any{
"items": []map[string]any{
{"id": id, "vector": vec, "metadata": meta},
},
})
if err != nil {
return err
}
url := r.vectordURL + "/vectors/index/" + corpus + "/add"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("add %q: status %d: %s", corpus, resp.StatusCode, b)
}
return nil
}
// Corpora returns the list of vectord index names. Thin proxy to
// GET /vectors/index — exposed at the matrix layer so callers don't
// need direct vectord access.
func (r *Retriever) Corpora(ctx context.Context) ([]string, error) {
url := r.vectordURL + "/vectors/index"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("vectord index list: status %d: %s", resp.StatusCode, b)
}
var out struct {
Names []string `json:"names"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
return out.Names, nil
}
// embed POSTs a single-text /embed call. Reuses embedd's batched
// /embed shape with len(texts)==1; embedd's LRU cache absorbs
// repeat queries (commit 56844c3).
func (r *Retriever) embed(ctx context.Context, text, model string) ([]float32, error) {
body, err := json.Marshal(map[string]any{"texts": []string{text}, "model": model})
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, r.embeddURL+"/embed", bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("embed status %d: %s", resp.StatusCode, b)
}
var out struct {
Vectors [][]float32 `json:"vectors"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
if len(out.Vectors) == 0 {
return nil, errors.New("embed returned no vectors")
}
return out.Vectors[0], nil
}
// searchCorpus calls vectord /vectors/index/{name}/search.
func (r *Retriever) searchCorpus(ctx context.Context, corpus string, vec []float32, k int) ([]vectord.Result, error) {
body, err := json.Marshal(map[string]any{"vector": vec, "k": k})
if err != nil {
return nil, err
}
url := r.vectordURL + "/vectors/index/" + corpus + "/search"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, b)
}
var out struct {
Results []vectord.Result `json:"results"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
return out.Results, nil
}