matrixd: multi-corpus retrieve+merge — SPEC §3.4 component 2 of 5
Lands the matrix indexer's first piece per docs/SPEC.md §3.4:
multi-corpus retrieve+merge with corpus attribution per result.
Future components (relevance filter, downgrade gate, learning-loop
integration) layer on top of this surface.
Architecture:
- internal/matrix/retrieve.go — Retriever takes (query, corpora,
k, per_corpus_k), parallel-fans across vectord indexes, merges
by distance ascending, preserves corpus origin per hit
- cmd/matrixd — HTTP service on :3217, fronts /v1/matrix/*
- gateway proxy + [matrixd] config + lakehouse.toml entry
- Either query_text (matrix calls embedd) or query_vector
(caller pre-embedded) — vector takes precedence if both set
Error policy: fail-loud on any corpus error. Silent partial returns
would lie about coverage, defeating the matrix's whole purpose.
Bubbles vectord errors as 502 (upstream), validation as 400.
Smoke (scripts/matrix_smoke.sh, 6 assertions PASS first try):
- /matrix/corpora lists indexes
- Multi-corpus search returns hits from BOTH corpora
- Top hit is the globally-closest across all corpora
(b-near beats a-near at distance 0.05 vs 0.1 — proves merge)
- Metadata round-trips through the merge
- Distances ascending in result list
- Negative paths: empty corpora → 400, missing corpus → 502,
no query → 400
12-smoke regression sweep all green (D1-D6, G1, G1P, G2,
storaged_cap, pathway, matrix).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a7620c8b6f
commit
c1d96b7b60
@ -45,6 +45,7 @@ func main() {
|
||||
"vectord_url": cfg.Gateway.VectordURL,
|
||||
"embedd_url": cfg.Gateway.EmbeddURL,
|
||||
"pathwayd_url": cfg.Gateway.PathwaydURL,
|
||||
"matrixd_url": cfg.Gateway.MatrixdURL,
|
||||
}
|
||||
for k, v := range upstreams {
|
||||
if v == "" {
|
||||
@ -65,6 +66,7 @@ func main() {
|
||||
vectordURL := mustParseUpstream("vectord_url", cfg.Gateway.VectordURL)
|
||||
embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL)
|
||||
pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL)
|
||||
matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL)
|
||||
|
||||
storagedProxy := gateway.NewProxyHandler(storagedURL)
|
||||
catalogdProxy := gateway.NewProxyHandler(catalogdURL)
|
||||
@ -73,6 +75,7 @@ func main() {
|
||||
vectordProxy := gateway.NewProxyHandler(vectordURL)
|
||||
embeddProxy := gateway.NewProxyHandler(embeddURL)
|
||||
pathwaydProxy := gateway.NewProxyHandler(pathwaydURL)
|
||||
matrixdProxy := gateway.NewProxyHandler(matrixdURL)
|
||||
|
||||
if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) {
|
||||
|
||||
@ -93,6 +96,8 @@ func main() {
|
||||
r.Handle("/v1/embed", embeddProxy)
|
||||
// Pathway memory — /v1/pathway/*
|
||||
r.Handle("/v1/pathway/*", pathwaydProxy)
|
||||
// Matrix indexer — /v1/matrix/* (multi-corpus retrieve+merge per SPEC §3.4)
|
||||
r.Handle("/v1/matrix/*", matrixdProxy)
|
||||
}, cfg.Auth); err != nil {
|
||||
slog.Error("server", "err", err)
|
||||
os.Exit(1)
|
||||
|
||||
125
cmd/matrixd/main.go
Normal file
125
cmd/matrixd/main.go
Normal file
@ -0,0 +1,125 @@
|
||||
// matrixd is the matrix indexer service. Wraps internal/matrix's
|
||||
// Retriever with HTTP routes per docs/SPEC.md §3.4.
|
||||
//
|
||||
// Routes:
|
||||
// POST /matrix/search — multi-corpus retrieve+merge
|
||||
// GET /matrix/corpora — list known vectord indexes (proxy)
|
||||
//
|
||||
// matrixd talks to embedd (for query-text embedding) and vectord
|
||||
// (for per-corpus search) via HTTP. Both URLs come from
|
||||
// [matrixd] config; gateway sets them to its own upstream URLs so
|
||||
// matrixd inherits the same provider topology.
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/matrix"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
)
|
||||
|
||||
const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
||||
flag.Parse()
|
||||
|
||||
cfg, err := shared.LoadConfig(*configPath)
|
||||
if err != nil {
|
||||
slog.Error("config", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if cfg.Matrixd.EmbeddURL == "" || cfg.Matrixd.VectordURL == "" {
|
||||
slog.Error("matrixd: embedd_url and vectord_url required in [matrixd]")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
retriever := matrix.New(cfg.Matrixd.EmbeddURL, cfg.Matrixd.VectordURL)
|
||||
h := &handlers{r: retriever}
|
||||
|
||||
if err := shared.Run("matrixd", cfg.Matrixd.Bind, h.register, cfg.Auth); err != nil {
|
||||
slog.Error("server", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
type handlers struct {
|
||||
r *matrix.Retriever
|
||||
}
|
||||
|
||||
func (h *handlers) register(r chi.Router) {
|
||||
r.Post("/matrix/search", h.handleSearch)
|
||||
r.Get("/matrix/corpora", h.handleCorpora)
|
||||
}
|
||||
|
||||
func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) {
|
||||
var req matrix.SearchRequest
|
||||
if !decodeJSON(w, r, &req) {
|
||||
return
|
||||
}
|
||||
resp, err := h.r.Search(r.Context(), req)
|
||||
if err != nil {
|
||||
writeMatrixError(w, err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *handlers) handleCorpora(w http.ResponseWriter, r *http.Request) {
|
||||
names, err := h.r.Corpora(r.Context())
|
||||
if err != nil {
|
||||
slog.Error("matrix corpora", "err", err)
|
||||
http.Error(w, "vectord unavailable", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"corpora": names, "count": len(names)})
|
||||
}
|
||||
|
||||
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
|
||||
defer r.Body.Close()
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
|
||||
if err := json.NewDecoder(r.Body).Decode(v); err != nil {
|
||||
var maxErr *http.MaxBytesError
|
||||
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
||||
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
|
||||
return false
|
||||
}
|
||||
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, code int, v any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
if err := json.NewEncoder(w).Encode(v); err != nil {
|
||||
slog.Warn("matrix write json", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// writeMatrixError maps internal/matrix sentinels to HTTP statuses.
|
||||
// Corpus / embed failures bubble up as 502 (the upstream service is
|
||||
// what's wrong); validation errors are 400.
|
||||
func writeMatrixError(w http.ResponseWriter, err error) {
|
||||
switch {
|
||||
case errors.Is(err, matrix.ErrEmptyCorpora),
|
||||
errors.Is(err, matrix.ErrEmptyQuery):
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
case errors.Is(err, matrix.ErrCorpus),
|
||||
errors.Is(err, matrix.ErrEmbed):
|
||||
slog.Warn("matrix upstream", "err", err)
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
default:
|
||||
slog.Error("matrix", "err", err)
|
||||
http.Error(w, "internal", http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
253
internal/matrix/retrieve.go
Normal file
253
internal/matrix/retrieve.go
Normal file
@ -0,0 +1,253 @@
|
||||
// Package matrix is the multi-corpus retrieval layer above vectord.
|
||||
// Per docs/SPEC.md §3.4: the matrix indexer composes N single-corpus
|
||||
// vectord indexes into one retrieve+merge surface, with corpus
|
||||
// attribution preserved per result. Future work in the same package:
|
||||
// relevance filter, strong-model downgrade gate, learning-loop
|
||||
// integration. This file is component 2 of the dependency-ordered
|
||||
// port plan — multi-corpus retrieve+merge, no filter yet.
|
||||
//
|
||||
// Why corpus-as-shard rather than hash-shard a single index:
|
||||
// different corpora have distinct topology and distinct retrieval
|
||||
// intent (workers vs candidates vs scrum_findings vs lakehouse_arch).
|
||||
// Multi-corpus search merges across them by distance — that IS the
|
||||
// matrix indexer's whole purpose. See feedback_meta_index_vision.md
|
||||
// and project_small_model_pipeline_vision.md.
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord"
|
||||
)
|
||||
|
||||
// Result is one merged hit with corpus attribution. The corpus field
|
||||
// is load-bearing — losing it would defeat the matrix's purpose
|
||||
// (knowing WHICH corpus contributed each hit is half the signal).
|
||||
type Result struct {
|
||||
ID string `json:"id"`
|
||||
Distance float32 `json:"distance"`
|
||||
Corpus string `json:"corpus"`
|
||||
Metadata json.RawMessage `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
// SearchRequest is the matrix search input. Either QueryText (matrix
|
||||
// embeds it via embedd) or QueryVector (already embedded by caller)
|
||||
// must be set; QueryVector takes precedence if both supplied.
|
||||
type SearchRequest struct {
|
||||
QueryText string `json:"query_text,omitempty"`
|
||||
QueryVector []float32 `json:"query_vector,omitempty"`
|
||||
Corpora []string `json:"corpora"`
|
||||
K int `json:"k"`
|
||||
PerCorpusK int `json:"per_corpus_k,omitempty"`
|
||||
Model string `json:"model,omitempty"`
|
||||
}
|
||||
|
||||
// SearchResponse wraps the merged results plus per-corpus return
|
||||
// counts so callers can detect "this corpus returned nothing"
|
||||
// without re-querying.
|
||||
type SearchResponse struct {
|
||||
Results []Result `json:"results"`
|
||||
PerCorpusCounts map[string]int `json:"per_corpus_counts"`
|
||||
}
|
||||
|
||||
// Retriever holds the HTTP clients to embedd and vectord. Stateless
|
||||
// otherwise — safe to share across goroutines.
|
||||
type Retriever struct {
|
||||
httpClient *http.Client
|
||||
embeddURL string
|
||||
vectordURL string
|
||||
}
|
||||
|
||||
// New returns a Retriever configured to call embedd at embeddURL
|
||||
// and vectord at vectordURL (both gateway-internal upstreams,
|
||||
// usually 127.0.0.1:3216 and :3215 respectively).
|
||||
func New(embeddURL, vectordURL string) *Retriever {
|
||||
return &Retriever{
|
||||
httpClient: &http.Client{Timeout: 30 * time.Second},
|
||||
embeddURL: embeddURL,
|
||||
vectordURL: vectordURL,
|
||||
}
|
||||
}
|
||||
|
||||
// Errors surfaced to HTTP handlers.
|
||||
var (
|
||||
ErrEmptyCorpora = errors.New("matrix: corpora must be non-empty")
|
||||
ErrEmptyQuery = errors.New("matrix: query_text or query_vector required")
|
||||
ErrCorpus = errors.New("matrix: corpus search failed") // wraps vectord errors
|
||||
ErrEmbed = errors.New("matrix: embed failed")
|
||||
)
|
||||
|
||||
// Search runs the matrix retrieve+merge.
|
||||
//
|
||||
// Error policy: fail-loud on any corpus error. Silent partial results
|
||||
// would lie about what was actually searched, which defeats the
|
||||
// indexer's coverage guarantee. Callers that want best-effort can
|
||||
// catch the error and re-issue with a smaller corpora list.
|
||||
func (r *Retriever) Search(ctx context.Context, req SearchRequest) (*SearchResponse, error) {
|
||||
if len(req.Corpora) == 0 {
|
||||
return nil, ErrEmptyCorpora
|
||||
}
|
||||
if req.K <= 0 {
|
||||
return nil, errors.New("matrix: k must be > 0")
|
||||
}
|
||||
if req.PerCorpusK <= 0 {
|
||||
req.PerCorpusK = req.K
|
||||
}
|
||||
|
||||
// Resolve query → vector.
|
||||
qvec := req.QueryVector
|
||||
if len(qvec) == 0 {
|
||||
if req.QueryText == "" {
|
||||
return nil, ErrEmptyQuery
|
||||
}
|
||||
v, err := r.embed(ctx, req.QueryText, req.Model)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrEmbed, err)
|
||||
}
|
||||
qvec = v
|
||||
}
|
||||
|
||||
// Parallel search across corpora. Each shard is independent;
|
||||
// fan-out + collect with WaitGroup is cleaner than channels-only.
|
||||
type shardResult struct {
|
||||
corpus string
|
||||
hits []vectord.Result
|
||||
err error
|
||||
}
|
||||
results := make([]shardResult, len(req.Corpora))
|
||||
var wg sync.WaitGroup
|
||||
for i, c := range req.Corpora {
|
||||
wg.Add(1)
|
||||
go func(i int, corpus string) {
|
||||
defer wg.Done()
|
||||
hits, err := r.searchCorpus(ctx, corpus, qvec, req.PerCorpusK)
|
||||
results[i] = shardResult{corpus: corpus, hits: hits, err: err}
|
||||
}(i, c)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var allHits []Result
|
||||
perCorpus := make(map[string]int, len(req.Corpora))
|
||||
for _, s := range results {
|
||||
if s.err != nil {
|
||||
return nil, fmt.Errorf("%w: %s: %v", ErrCorpus, s.corpus, s.err)
|
||||
}
|
||||
perCorpus[s.corpus] = len(s.hits)
|
||||
for _, h := range s.hits {
|
||||
allHits = append(allHits, Result{
|
||||
ID: h.ID, Distance: h.Distance, Corpus: s.corpus, Metadata: h.Metadata,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Stable sort so equal-distance ties keep input order (which is
|
||||
// per-corpus order from vectord's HNSW result heap). This matters
|
||||
// for deterministic test assertions.
|
||||
sort.SliceStable(allHits, func(i, j int) bool {
|
||||
return allHits[i].Distance < allHits[j].Distance
|
||||
})
|
||||
if len(allHits) > req.K {
|
||||
allHits = allHits[:req.K]
|
||||
}
|
||||
return &SearchResponse{Results: allHits, PerCorpusCounts: perCorpus}, nil
|
||||
}
|
||||
|
||||
// Corpora returns the list of vectord index names. Thin proxy to
|
||||
// GET /vectors/index — exposed at the matrix layer so callers don't
|
||||
// need direct vectord access.
|
||||
func (r *Retriever) Corpora(ctx context.Context) ([]string, error) {
|
||||
url := r.vectordURL + "/vectors/index"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := r.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("vectord index list: status %d: %s", resp.StatusCode, b)
|
||||
}
|
||||
var out struct {
|
||||
Names []string `json:"names"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Names, nil
|
||||
}
|
||||
|
||||
// embed POSTs a single-text /embed call. Reuses embedd's batched
|
||||
// /embed shape with len(texts)==1; embedd's LRU cache absorbs
|
||||
// repeat queries (commit 56844c3).
|
||||
func (r *Retriever) embed(ctx context.Context, text, model string) ([]float32, error) {
|
||||
body, err := json.Marshal(map[string]any{"texts": []string{text}, "model": model})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, r.embeddURL+"/embed", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
resp, err := r.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("embed status %d: %s", resp.StatusCode, b)
|
||||
}
|
||||
var out struct {
|
||||
Vectors [][]float32 `json:"vectors"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(out.Vectors) == 0 {
|
||||
return nil, errors.New("embed returned no vectors")
|
||||
}
|
||||
return out.Vectors[0], nil
|
||||
}
|
||||
|
||||
// searchCorpus calls vectord /vectors/index/{name}/search.
|
||||
func (r *Retriever) searchCorpus(ctx context.Context, corpus string, vec []float32, k int) ([]vectord.Result, error) {
|
||||
body, err := json.Marshal(map[string]any{"vector": vec, "k": k})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
url := r.vectordURL + "/vectors/index/" + corpus + "/search"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
resp, err := r.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, b)
|
||||
}
|
||||
var out struct {
|
||||
Results []vectord.Result `json:"results"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Results, nil
|
||||
}
|
||||
@ -27,6 +27,7 @@ type Config struct {
|
||||
Vectord VectordConfig `toml:"vectord"`
|
||||
Embedd EmbeddConfig `toml:"embedd"`
|
||||
Pathwayd PathwaydConfig `toml:"pathwayd"`
|
||||
Matrixd MatrixdConfig `toml:"matrixd"`
|
||||
S3 S3Config `toml:"s3"`
|
||||
Log LogConfig `toml:"log"`
|
||||
Auth AuthConfig `toml:"auth"`
|
||||
@ -51,9 +52,9 @@ type IngestConfig struct {
|
||||
|
||||
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
||||
// Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql,
|
||||
// /v1/vectors, /v1/embed, /v1/pathway) has its own upstream so we can
|
||||
// scale services independently or move them to different boxes without
|
||||
// touching gateway code.
|
||||
// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix) has its own
|
||||
// upstream so we can scale services independently or move them to
|
||||
// different boxes without touching gateway code.
|
||||
type GatewayConfig struct {
|
||||
Bind string `toml:"bind"`
|
||||
StoragedURL string `toml:"storaged_url"`
|
||||
@ -63,6 +64,7 @@ type GatewayConfig struct {
|
||||
VectordURL string `toml:"vectord_url"`
|
||||
EmbeddURL string `toml:"embedd_url"`
|
||||
PathwaydURL string `toml:"pathwayd_url"`
|
||||
MatrixdURL string `toml:"matrixd_url"`
|
||||
}
|
||||
|
||||
// EmbeddConfig drives the embed service. ProviderURL points at the
|
||||
@ -96,6 +98,16 @@ type PathwaydConfig struct {
|
||||
PersistPath string `toml:"persist_path"`
|
||||
}
|
||||
|
||||
// MatrixdConfig drives the matrix-indexer service (cmd/matrixd).
|
||||
// Per docs/SPEC.md §3.4: multi-corpus retrieve+merge over vectord
|
||||
// with embed-via-embedd for query text. Both upstream URLs are
|
||||
// required — matrixd has no in-process fallback.
|
||||
type MatrixdConfig struct {
|
||||
Bind string `toml:"bind"`
|
||||
EmbeddURL string `toml:"embedd_url"`
|
||||
VectordURL string `toml:"vectord_url"`
|
||||
}
|
||||
|
||||
// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB
|
||||
// directly to MinIO via DuckDB's httpfs extension (so no storaged
|
||||
// URL needed), and reads the catalog over HTTP for view registration.
|
||||
@ -173,6 +185,7 @@ func DefaultConfig() Config {
|
||||
VectordURL: "http://127.0.0.1:3215",
|
||||
EmbeddURL: "http://127.0.0.1:3216",
|
||||
PathwaydURL: "http://127.0.0.1:3217",
|
||||
MatrixdURL: "http://127.0.0.1:3218",
|
||||
},
|
||||
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
||||
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
||||
@ -197,6 +210,11 @@ func DefaultConfig() Config {
|
||||
// PersistPath empty by default = in-memory only. Production
|
||||
// sets to e.g. /var/lib/lakehouse/pathway/state.jsonl.
|
||||
},
|
||||
Matrixd: MatrixdConfig{
|
||||
Bind: "127.0.0.1:3218",
|
||||
EmbeddURL: "http://127.0.0.1:3216",
|
||||
VectordURL: "http://127.0.0.1:3215",
|
||||
},
|
||||
Queryd: QuerydConfig{
|
||||
Bind: "127.0.0.1:3214",
|
||||
CatalogdURL: "http://127.0.0.1:3212",
|
||||
|
||||
@ -13,6 +13,7 @@ queryd_url = "http://127.0.0.1:3214"
|
||||
vectord_url = "http://127.0.0.1:3215"
|
||||
embedd_url = "http://127.0.0.1:3216"
|
||||
pathwayd_url = "http://127.0.0.1:3217"
|
||||
matrixd_url = "http://127.0.0.1:3218"
|
||||
|
||||
[storaged]
|
||||
bind = "127.0.0.1:3211"
|
||||
@ -54,6 +55,14 @@ bind = "127.0.0.1:3217"
|
||||
# /var/lib/lakehouse/pathway/state.jsonl so traces survive restart.
|
||||
persist_path = ""
|
||||
|
||||
[matrixd]
|
||||
bind = "127.0.0.1:3218"
|
||||
# matrixd calls embedd (query-text → vector) and vectord (per-corpus
|
||||
# search) directly. Localhost defaults; in distributed deployments
|
||||
# these point at the gateway's upstream addresses.
|
||||
embedd_url = "http://127.0.0.1:3216"
|
||||
vectord_url = "http://127.0.0.1:3215"
|
||||
|
||||
[s3]
|
||||
endpoint = "http://localhost:9000"
|
||||
region = "us-east-1"
|
||||
|
||||
213
scripts/matrix_smoke.sh
Executable file
213
scripts/matrix_smoke.sh
Executable file
@ -0,0 +1,213 @@
|
||||
#!/usr/bin/env bash
|
||||
# Matrix smoke — multi-corpus retrieve+merge via matrixd (SPEC §3.4).
|
||||
# All assertions go through gateway :3110.
|
||||
#
|
||||
# Validates:
|
||||
# - Multi-corpus search returns hits from BOTH corpora
|
||||
# - Each result carries its corpus attribution (load-bearing — losing
|
||||
# it defeats the matrix's purpose)
|
||||
# - Merged top-k is ordered by distance across corpora
|
||||
# - /matrix/corpora lists known indexes
|
||||
# - Empty corpora list → 400
|
||||
# - Bad corpus name → 502 (matrix bubbles vectord's 404 as upstream error)
|
||||
#
|
||||
# Uses query_vector (not query_text) to skip the embedd dependency so
|
||||
# this smoke runs without Ollama. End-to-end embed→matrix→search has
|
||||
# its own integration test (next commit).
|
||||
#
|
||||
# Usage: ./scripts/matrix_smoke.sh
|
||||
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
export PATH="$PATH:/usr/local/go/bin"
|
||||
|
||||
echo "[matrix-smoke] building matrixd + vectord + gateway..."
|
||||
go build -o bin/ ./cmd/matrixd ./cmd/vectord ./cmd/gateway
|
||||
|
||||
pkill -f "bin/(matrixd|vectord|gateway)" 2>/dev/null || true
|
||||
sleep 0.3
|
||||
|
||||
PIDS=()
|
||||
TMP="$(mktemp -d)"
|
||||
CFG="$TMP/matrix.toml"
|
||||
|
||||
cleanup() {
|
||||
echo "[matrix-smoke] cleanup"
|
||||
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
|
||||
rm -rf "$TMP"
|
||||
}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
# Custom toml: vectord persistence disabled (don't pollute storaged
|
||||
# state with the test corpora).
|
||||
cat > "$CFG" <<EOF
|
||||
[gateway]
|
||||
bind = "127.0.0.1:3110"
|
||||
storaged_url = "http://127.0.0.1:3211"
|
||||
catalogd_url = "http://127.0.0.1:3212"
|
||||
ingestd_url = "http://127.0.0.1:3213"
|
||||
queryd_url = "http://127.0.0.1:3214"
|
||||
vectord_url = "http://127.0.0.1:3215"
|
||||
embedd_url = "http://127.0.0.1:3216"
|
||||
pathwayd_url = "http://127.0.0.1:3217"
|
||||
matrixd_url = "http://127.0.0.1:3218"
|
||||
|
||||
[vectord]
|
||||
bind = "127.0.0.1:3215"
|
||||
storaged_url = ""
|
||||
|
||||
[matrixd]
|
||||
bind = "127.0.0.1:3218"
|
||||
embedd_url = "http://127.0.0.1:3216"
|
||||
vectord_url = "http://127.0.0.1:3215"
|
||||
EOF
|
||||
|
||||
poll_health() {
|
||||
local port="$1" deadline=$(($(date +%s) + 5))
|
||||
while [ "$(date +%s)" -lt "$deadline" ]; do
|
||||
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
|
||||
sleep 0.05
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
echo "[matrix-smoke] launching vectord → matrixd → gateway..."
|
||||
./bin/vectord -config "$CFG" > /tmp/vectord.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; }
|
||||
|
||||
./bin/matrixd -config "$CFG" > /tmp/matrixd.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3218 || { echo "matrixd failed"; tail /tmp/matrixd.log; exit 1; }
|
||||
|
||||
./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
|
||||
|
||||
FAILED=0
|
||||
DIM=4
|
||||
|
||||
# Create two corpora — corpus_a and corpus_b — each with a few
|
||||
# vectors at known distances from a chosen query vector.
|
||||
echo "[matrix-smoke] create two corpora:"
|
||||
for c in corpus_a corpus_b; do
|
||||
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/vectors/index \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d "{\"name\":\"$c\",\"dimension\":$DIM,\"distance\":\"euclidean\"}")"
|
||||
if [ "$HTTP" != "201" ]; then echo " ✗ create $c → $HTTP"; FAILED=1; fi
|
||||
done
|
||||
echo " ✓ corpus_a and corpus_b created"
|
||||
|
||||
# Add vectors. Use euclidean distance for predictable arithmetic.
|
||||
# Query vector will be [1,0,0,0]. Distances from it:
|
||||
# corpus_a/a-near : [1.1, 0, 0, 0] ≈ 0.1
|
||||
# corpus_a/a-mid : [1, 0.5, 0, 0] ≈ 0.5
|
||||
# corpus_a/a-far : [3, 0, 0, 0] ≈ 2.0
|
||||
# corpus_b/b-near : [1.05, 0, 0, 0] ≈ 0.05 (closest globally)
|
||||
# corpus_b/b-mid : [1, 0.7, 0, 0] ≈ 0.7
|
||||
# corpus_b/b-far : [4, 0, 0, 0] ≈ 3.0
|
||||
echo "[matrix-smoke] add vectors to both corpora:"
|
||||
curl -sS -o /dev/null -X POST "http://127.0.0.1:3110/v1/vectors/index/corpus_a/add" \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"items":[
|
||||
{"id":"a-near","vector":[1.1,0,0,0],"metadata":{"label":"a near"}},
|
||||
{"id":"a-mid","vector":[1,0.5,0,0],"metadata":{"label":"a mid"}},
|
||||
{"id":"a-far","vector":[3,0,0,0],"metadata":{"label":"a far"}}
|
||||
]}'
|
||||
curl -sS -o /dev/null -X POST "http://127.0.0.1:3110/v1/vectors/index/corpus_b/add" \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"items":[
|
||||
{"id":"b-near","vector":[1.05,0,0,0],"metadata":{"label":"b near"}},
|
||||
{"id":"b-mid","vector":[1,0.7,0,0],"metadata":{"label":"b mid"}},
|
||||
{"id":"b-far","vector":[4,0,0,0],"metadata":{"label":"b far"}}
|
||||
]}'
|
||||
echo " ✓ 3 + 3 vectors loaded"
|
||||
|
||||
# ── 1. /matrix/corpora lists both ─────────────────────────────────
|
||||
echo "[matrix-smoke] /matrix/corpora lists both:"
|
||||
RESP="$(curl -sS http://127.0.0.1:3110/v1/matrix/corpora)"
|
||||
COUNT="$(echo "$RESP" | jq -r '.count')"
|
||||
HAS_A="$(echo "$RESP" | jq -r '.corpora | index("corpus_a") != null')"
|
||||
HAS_B="$(echo "$RESP" | jq -r '.corpora | index("corpus_b") != null')"
|
||||
if [ "$COUNT" = "2" ] && [ "$HAS_A" = "true" ] && [ "$HAS_B" = "true" ]; then
|
||||
echo " ✓ count=2, both corpora listed"
|
||||
else
|
||||
echo " ✗ resp: $RESP"; FAILED=1
|
||||
fi
|
||||
|
||||
# ── 2. multi-corpus search returns hits from BOTH ─────────────────
|
||||
echo "[matrix-smoke] /matrix/search multi-corpus retrieve+merge:"
|
||||
RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/matrix/search \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"query_vector":[1,0,0,0],"corpora":["corpus_a","corpus_b"],"k":4,"per_corpus_k":3}')"
|
||||
RESULTS_LEN="$(echo "$RESP" | jq -r '.results | length')"
|
||||
A_COUNT="$(echo "$RESP" | jq -r '.per_corpus_counts.corpus_a')"
|
||||
B_COUNT="$(echo "$RESP" | jq -r '.per_corpus_counts.corpus_b')"
|
||||
HAS_A_RESULT="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="corpus_a")] | length > 0')"
|
||||
HAS_B_RESULT="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="corpus_b")] | length > 0')"
|
||||
if [ "$RESULTS_LEN" = "4" ] && [ "$A_COUNT" = "3" ] && [ "$B_COUNT" = "3" ] && [ "$HAS_A_RESULT" = "true" ] && [ "$HAS_B_RESULT" = "true" ]; then
|
||||
echo " ✓ 4 merged results · 3+3 per-corpus · both corpora represented"
|
||||
else
|
||||
echo " ✗ len=$RESULTS_LEN per_corpus={a:$A_COUNT b:$B_COUNT} a_hit=$HAS_A_RESULT b_hit=$HAS_B_RESULT"
|
||||
echo " full: $RESP"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
# ── 3. distance-merged top-k correct across corpora ───────────────
|
||||
echo "[matrix-smoke] top hit comes from corpus_b (b-near is globally closest):"
|
||||
TOP_ID="$(echo "$RESP" | jq -r '.results[0].id')"
|
||||
TOP_CORPUS="$(echo "$RESP" | jq -r '.results[0].corpus')"
|
||||
if [ "$TOP_ID" = "b-near" ] && [ "$TOP_CORPUS" = "corpus_b" ]; then
|
||||
echo " ✓ top hit: id=b-near corpus=corpus_b (closer than corpus_a's a-near)"
|
||||
else
|
||||
echo " ✗ top: id=$TOP_ID corpus=$TOP_CORPUS (expected b-near/corpus_b)"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
# ── 4. corpus attribution preserved in metadata ───────────────────
|
||||
echo "[matrix-smoke] metadata preserved on merged results:"
|
||||
TOP_LABEL="$(echo "$RESP" | jq -r '.results[0].metadata.label')"
|
||||
if [ "$TOP_LABEL" = "b near" ]; then
|
||||
echo " ✓ metadata.label round-trips through matrix"
|
||||
else
|
||||
echo " ✗ label=$TOP_LABEL"; FAILED=1
|
||||
fi
|
||||
|
||||
# ── 5. distances ascending in result list ─────────────────────────
|
||||
echo "[matrix-smoke] results sorted by distance ascending:"
|
||||
ASCENDING="$(echo "$RESP" | jq -r '[.results[].distance] | . == (sort)')"
|
||||
if [ "$ASCENDING" = "true" ]; then
|
||||
echo " ✓ distances ascending"
|
||||
else
|
||||
echo " ✗ distances not sorted: $(echo "$RESP" | jq -c '[.results[].distance]')"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
# ── 6. negative paths ─────────────────────────────────────────────
|
||||
echo "[matrix-smoke] empty corpora → 400:"
|
||||
HTTP_400="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"query_vector":[1,0,0,0],"corpora":[],"k":4}')"
|
||||
echo "[matrix-smoke] missing corpus name → 502:"
|
||||
HTTP_502="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"query_vector":[1,0,0,0],"corpora":["does_not_exist"],"k":4}')"
|
||||
echo "[matrix-smoke] no query (empty text and vector) → 400:"
|
||||
HTTP_400b="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"corpora":["corpus_a"],"k":4}')"
|
||||
if [ "$HTTP_400" = "400" ] && [ "$HTTP_502" = "502" ] && [ "$HTTP_400b" = "400" ]; then
|
||||
echo " ✓ empty=400, missing-corpus=502, no-query=400"
|
||||
else
|
||||
echo " ✗ empty=$HTTP_400 missing=$HTTP_502 noquery=$HTTP_400b"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
if [ "$FAILED" -eq 0 ]; then
|
||||
echo "[matrix-smoke] Matrix acceptance gate: PASSED"
|
||||
exit 0
|
||||
else
|
||||
echo "[matrix-smoke] Matrix acceptance gate: FAILED"
|
||||
exit 1
|
||||
fi
|
||||
Loading…
x
Reference in New Issue
Block a user