diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 7e41318..2ee6e62 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -45,6 +45,7 @@ func main() { "vectord_url": cfg.Gateway.VectordURL, "embedd_url": cfg.Gateway.EmbeddURL, "pathwayd_url": cfg.Gateway.PathwaydURL, + "matrixd_url": cfg.Gateway.MatrixdURL, } for k, v := range upstreams { if v == "" { @@ -65,6 +66,7 @@ func main() { vectordURL := mustParseUpstream("vectord_url", cfg.Gateway.VectordURL) embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL) pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL) + matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL) storagedProxy := gateway.NewProxyHandler(storagedURL) catalogdProxy := gateway.NewProxyHandler(catalogdURL) @@ -73,6 +75,7 @@ func main() { vectordProxy := gateway.NewProxyHandler(vectordURL) embeddProxy := gateway.NewProxyHandler(embeddURL) pathwaydProxy := gateway.NewProxyHandler(pathwaydURL) + matrixdProxy := gateway.NewProxyHandler(matrixdURL) if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) { @@ -93,6 +96,8 @@ func main() { r.Handle("/v1/embed", embeddProxy) // Pathway memory — /v1/pathway/* r.Handle("/v1/pathway/*", pathwaydProxy) + // Matrix indexer — /v1/matrix/* (multi-corpus retrieve+merge per SPEC §3.4) + r.Handle("/v1/matrix/*", matrixdProxy) }, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) diff --git a/cmd/matrixd/main.go b/cmd/matrixd/main.go new file mode 100644 index 0000000..427a3f5 --- /dev/null +++ b/cmd/matrixd/main.go @@ -0,0 +1,125 @@ +// matrixd is the matrix indexer service. Wraps internal/matrix's +// Retriever with HTTP routes per docs/SPEC.md §3.4. +// +// Routes: +// POST /matrix/search — multi-corpus retrieve+merge +// GET /matrix/corpora — list known vectord indexes (proxy) +// +// matrixd talks to embedd (for query-text embedding) and vectord +// (for per-corpus search) via HTTP. Both URLs come from +// [matrixd] config; gateway sets them to its own upstream URLs so +// matrixd inherits the same provider topology. +package main + +import ( + "encoding/json" + "errors" + "flag" + "log/slog" + "net/http" + "os" + "strings" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/matrix" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies + +func main() { + configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + flag.Parse() + + cfg, err := shared.LoadConfig(*configPath) + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + if cfg.Matrixd.EmbeddURL == "" || cfg.Matrixd.VectordURL == "" { + slog.Error("matrixd: embedd_url and vectord_url required in [matrixd]") + os.Exit(1) + } + + retriever := matrix.New(cfg.Matrixd.EmbeddURL, cfg.Matrixd.VectordURL) + h := &handlers{r: retriever} + + if err := shared.Run("matrixd", cfg.Matrixd.Bind, h.register, cfg.Auth); err != nil { + slog.Error("server", "err", err) + os.Exit(1) + } +} + +type handlers struct { + r *matrix.Retriever +} + +func (h *handlers) register(r chi.Router) { + r.Post("/matrix/search", h.handleSearch) + r.Get("/matrix/corpora", h.handleCorpora) +} + +func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) { + var req matrix.SearchRequest + if !decodeJSON(w, r, &req) { + return + } + resp, err := h.r.Search(r.Context(), req) + if err != nil { + writeMatrixError(w, err) + return + } + writeJSON(w, http.StatusOK, resp) +} + +func (h *handlers) handleCorpora(w http.ResponseWriter, r *http.Request) { + names, err := h.r.Corpora(r.Context()) + if err != nil { + slog.Error("matrix corpora", "err", err) + http.Error(w, "vectord unavailable", http.StatusBadGateway) + return + } + writeJSON(w, http.StatusOK, map[string]any{"corpora": names, "count": len(names)}) +} + +func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool { + defer r.Body.Close() + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + if err := json.NewDecoder(r.Body).Decode(v); err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { + http.Error(w, "body too large", http.StatusRequestEntityTooLarge) + return false + } + http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) + return false + } + return true +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + if err := json.NewEncoder(w).Encode(v); err != nil { + slog.Warn("matrix write json", "err", err) + } +} + +// writeMatrixError maps internal/matrix sentinels to HTTP statuses. +// Corpus / embed failures bubble up as 502 (the upstream service is +// what's wrong); validation errors are 400. +func writeMatrixError(w http.ResponseWriter, err error) { + switch { + case errors.Is(err, matrix.ErrEmptyCorpora), + errors.Is(err, matrix.ErrEmptyQuery): + http.Error(w, err.Error(), http.StatusBadRequest) + case errors.Is(err, matrix.ErrCorpus), + errors.Is(err, matrix.ErrEmbed): + slog.Warn("matrix upstream", "err", err) + http.Error(w, err.Error(), http.StatusBadGateway) + default: + slog.Error("matrix", "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + } +} diff --git a/internal/matrix/retrieve.go b/internal/matrix/retrieve.go new file mode 100644 index 0000000..8b1c9ed --- /dev/null +++ b/internal/matrix/retrieve.go @@ -0,0 +1,253 @@ +// Package matrix is the multi-corpus retrieval layer above vectord. +// Per docs/SPEC.md §3.4: the matrix indexer composes N single-corpus +// vectord indexes into one retrieve+merge surface, with corpus +// attribution preserved per result. Future work in the same package: +// relevance filter, strong-model downgrade gate, learning-loop +// integration. This file is component 2 of the dependency-ordered +// port plan — multi-corpus retrieve+merge, no filter yet. +// +// Why corpus-as-shard rather than hash-shard a single index: +// different corpora have distinct topology and distinct retrieval +// intent (workers vs candidates vs scrum_findings vs lakehouse_arch). +// Multi-corpus search merges across them by distance — that IS the +// matrix indexer's whole purpose. See feedback_meta_index_vision.md +// and project_small_model_pipeline_vision.md. +package matrix + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sort" + "sync" + "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord" +) + +// Result is one merged hit with corpus attribution. The corpus field +// is load-bearing — losing it would defeat the matrix's purpose +// (knowing WHICH corpus contributed each hit is half the signal). +type Result struct { + ID string `json:"id"` + Distance float32 `json:"distance"` + Corpus string `json:"corpus"` + Metadata json.RawMessage `json:"metadata,omitempty"` +} + +// SearchRequest is the matrix search input. Either QueryText (matrix +// embeds it via embedd) or QueryVector (already embedded by caller) +// must be set; QueryVector takes precedence if both supplied. +type SearchRequest struct { + QueryText string `json:"query_text,omitempty"` + QueryVector []float32 `json:"query_vector,omitempty"` + Corpora []string `json:"corpora"` + K int `json:"k"` + PerCorpusK int `json:"per_corpus_k,omitempty"` + Model string `json:"model,omitempty"` +} + +// SearchResponse wraps the merged results plus per-corpus return +// counts so callers can detect "this corpus returned nothing" +// without re-querying. +type SearchResponse struct { + Results []Result `json:"results"` + PerCorpusCounts map[string]int `json:"per_corpus_counts"` +} + +// Retriever holds the HTTP clients to embedd and vectord. Stateless +// otherwise — safe to share across goroutines. +type Retriever struct { + httpClient *http.Client + embeddURL string + vectordURL string +} + +// New returns a Retriever configured to call embedd at embeddURL +// and vectord at vectordURL (both gateway-internal upstreams, +// usually 127.0.0.1:3216 and :3215 respectively). +func New(embeddURL, vectordURL string) *Retriever { + return &Retriever{ + httpClient: &http.Client{Timeout: 30 * time.Second}, + embeddURL: embeddURL, + vectordURL: vectordURL, + } +} + +// Errors surfaced to HTTP handlers. +var ( + ErrEmptyCorpora = errors.New("matrix: corpora must be non-empty") + ErrEmptyQuery = errors.New("matrix: query_text or query_vector required") + ErrCorpus = errors.New("matrix: corpus search failed") // wraps vectord errors + ErrEmbed = errors.New("matrix: embed failed") +) + +// Search runs the matrix retrieve+merge. +// +// Error policy: fail-loud on any corpus error. Silent partial results +// would lie about what was actually searched, which defeats the +// indexer's coverage guarantee. Callers that want best-effort can +// catch the error and re-issue with a smaller corpora list. +func (r *Retriever) Search(ctx context.Context, req SearchRequest) (*SearchResponse, error) { + if len(req.Corpora) == 0 { + return nil, ErrEmptyCorpora + } + if req.K <= 0 { + return nil, errors.New("matrix: k must be > 0") + } + if req.PerCorpusK <= 0 { + req.PerCorpusK = req.K + } + + // Resolve query → vector. + qvec := req.QueryVector + if len(qvec) == 0 { + if req.QueryText == "" { + return nil, ErrEmptyQuery + } + v, err := r.embed(ctx, req.QueryText, req.Model) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEmbed, err) + } + qvec = v + } + + // Parallel search across corpora. Each shard is independent; + // fan-out + collect with WaitGroup is cleaner than channels-only. + type shardResult struct { + corpus string + hits []vectord.Result + err error + } + results := make([]shardResult, len(req.Corpora)) + var wg sync.WaitGroup + for i, c := range req.Corpora { + wg.Add(1) + go func(i int, corpus string) { + defer wg.Done() + hits, err := r.searchCorpus(ctx, corpus, qvec, req.PerCorpusK) + results[i] = shardResult{corpus: corpus, hits: hits, err: err} + }(i, c) + } + wg.Wait() + + var allHits []Result + perCorpus := make(map[string]int, len(req.Corpora)) + for _, s := range results { + if s.err != nil { + return nil, fmt.Errorf("%w: %s: %v", ErrCorpus, s.corpus, s.err) + } + perCorpus[s.corpus] = len(s.hits) + for _, h := range s.hits { + allHits = append(allHits, Result{ + ID: h.ID, Distance: h.Distance, Corpus: s.corpus, Metadata: h.Metadata, + }) + } + } + + // Stable sort so equal-distance ties keep input order (which is + // per-corpus order from vectord's HNSW result heap). This matters + // for deterministic test assertions. + sort.SliceStable(allHits, func(i, j int) bool { + return allHits[i].Distance < allHits[j].Distance + }) + if len(allHits) > req.K { + allHits = allHits[:req.K] + } + return &SearchResponse{Results: allHits, PerCorpusCounts: perCorpus}, nil +} + +// Corpora returns the list of vectord index names. Thin proxy to +// GET /vectors/index — exposed at the matrix layer so callers don't +// need direct vectord access. +func (r *Retriever) Corpora(ctx context.Context) ([]string, error) { + url := r.vectordURL + "/vectors/index" + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := r.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("vectord index list: status %d: %s", resp.StatusCode, b) + } + var out struct { + Names []string `json:"names"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, err + } + return out.Names, nil +} + +// embed POSTs a single-text /embed call. Reuses embedd's batched +// /embed shape with len(texts)==1; embedd's LRU cache absorbs +// repeat queries (commit 56844c3). +func (r *Retriever) embed(ctx context.Context, text, model string) ([]float32, error) { + body, err := json.Marshal(map[string]any{"texts": []string{text}, "model": model}) + if err != nil { + return nil, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, r.embeddURL+"/embed", bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + resp, err := r.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("embed status %d: %s", resp.StatusCode, b) + } + var out struct { + Vectors [][]float32 `json:"vectors"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, err + } + if len(out.Vectors) == 0 { + return nil, errors.New("embed returned no vectors") + } + return out.Vectors[0], nil +} + +// searchCorpus calls vectord /vectors/index/{name}/search. +func (r *Retriever) searchCorpus(ctx context.Context, corpus string, vec []float32, k int) ([]vectord.Result, error) { + body, err := json.Marshal(map[string]any{"vector": vec, "k": k}) + if err != nil { + return nil, err + } + url := r.vectordURL + "/vectors/index/" + corpus + "/search" + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + resp, err := r.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("status %d: %s", resp.StatusCode, b) + } + var out struct { + Results []vectord.Result `json:"results"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, err + } + return out.Results, nil +} diff --git a/internal/shared/config.go b/internal/shared/config.go index 1f1d2f6..7268b25 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -27,6 +27,7 @@ type Config struct { Vectord VectordConfig `toml:"vectord"` Embedd EmbeddConfig `toml:"embedd"` Pathwayd PathwaydConfig `toml:"pathwayd"` + Matrixd MatrixdConfig `toml:"matrixd"` S3 S3Config `toml:"s3"` Log LogConfig `toml:"log"` Auth AuthConfig `toml:"auth"` @@ -51,9 +52,9 @@ type IngestConfig struct { // GatewayConfig adds the upstream URLs the reverse proxy fronts. // Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql, -// /v1/vectors, /v1/embed, /v1/pathway) has its own upstream so we can -// scale services independently or move them to different boxes without -// touching gateway code. +// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix) has its own +// upstream so we can scale services independently or move them to +// different boxes without touching gateway code. type GatewayConfig struct { Bind string `toml:"bind"` StoragedURL string `toml:"storaged_url"` @@ -63,6 +64,7 @@ type GatewayConfig struct { VectordURL string `toml:"vectord_url"` EmbeddURL string `toml:"embedd_url"` PathwaydURL string `toml:"pathwayd_url"` + MatrixdURL string `toml:"matrixd_url"` } // EmbeddConfig drives the embed service. ProviderURL points at the @@ -96,6 +98,16 @@ type PathwaydConfig struct { PersistPath string `toml:"persist_path"` } +// MatrixdConfig drives the matrix-indexer service (cmd/matrixd). +// Per docs/SPEC.md §3.4: multi-corpus retrieve+merge over vectord +// with embed-via-embedd for query text. Both upstream URLs are +// required — matrixd has no in-process fallback. +type MatrixdConfig struct { + Bind string `toml:"bind"` + EmbeddURL string `toml:"embedd_url"` + VectordURL string `toml:"vectord_url"` +} + // QuerydConfig adds queryd-specific knobs. queryd talks DuckDB // directly to MinIO via DuckDB's httpfs extension (so no storaged // URL needed), and reads the catalog over HTTP for view registration. @@ -173,6 +185,7 @@ func DefaultConfig() Config { VectordURL: "http://127.0.0.1:3215", EmbeddURL: "http://127.0.0.1:3216", PathwaydURL: "http://127.0.0.1:3217", + MatrixdURL: "http://127.0.0.1:3218", }, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, @@ -197,6 +210,11 @@ func DefaultConfig() Config { // PersistPath empty by default = in-memory only. Production // sets to e.g. /var/lib/lakehouse/pathway/state.jsonl. }, + Matrixd: MatrixdConfig{ + Bind: "127.0.0.1:3218", + EmbeddURL: "http://127.0.0.1:3216", + VectordURL: "http://127.0.0.1:3215", + }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", CatalogdURL: "http://127.0.0.1:3212", diff --git a/lakehouse.toml b/lakehouse.toml index ab17993..a54682d 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -13,6 +13,7 @@ queryd_url = "http://127.0.0.1:3214" vectord_url = "http://127.0.0.1:3215" embedd_url = "http://127.0.0.1:3216" pathwayd_url = "http://127.0.0.1:3217" +matrixd_url = "http://127.0.0.1:3218" [storaged] bind = "127.0.0.1:3211" @@ -54,6 +55,14 @@ bind = "127.0.0.1:3217" # /var/lib/lakehouse/pathway/state.jsonl so traces survive restart. persist_path = "" +[matrixd] +bind = "127.0.0.1:3218" +# matrixd calls embedd (query-text → vector) and vectord (per-corpus +# search) directly. Localhost defaults; in distributed deployments +# these point at the gateway's upstream addresses. +embedd_url = "http://127.0.0.1:3216" +vectord_url = "http://127.0.0.1:3215" + [s3] endpoint = "http://localhost:9000" region = "us-east-1" diff --git a/scripts/matrix_smoke.sh b/scripts/matrix_smoke.sh new file mode 100755 index 0000000..2cd6f19 --- /dev/null +++ b/scripts/matrix_smoke.sh @@ -0,0 +1,213 @@ +#!/usr/bin/env bash +# Matrix smoke — multi-corpus retrieve+merge via matrixd (SPEC §3.4). +# All assertions go through gateway :3110. +# +# Validates: +# - Multi-corpus search returns hits from BOTH corpora +# - Each result carries its corpus attribution (load-bearing — losing +# it defeats the matrix's purpose) +# - Merged top-k is ordered by distance across corpora +# - /matrix/corpora lists known indexes +# - Empty corpora list → 400 +# - Bad corpus name → 502 (matrix bubbles vectord's 404 as upstream error) +# +# Uses query_vector (not query_text) to skip the embedd dependency so +# this smoke runs without Ollama. End-to-end embed→matrix→search has +# its own integration test (next commit). +# +# Usage: ./scripts/matrix_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[matrix-smoke] building matrixd + vectord + gateway..." +go build -o bin/ ./cmd/matrixd ./cmd/vectord ./cmd/gateway + +pkill -f "bin/(matrixd|vectord|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/matrix.toml" + +cleanup() { + echo "[matrix-smoke] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Custom toml: vectord persistence disabled (don't pollute storaged +# state with the test corpora). +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[matrix-smoke] launching vectord → matrixd → gateway..." +./bin/vectord -config "$CFG" > /tmp/vectord.log 2>&1 & +PIDS+=($!) +poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; } + +./bin/matrixd -config "$CFG" > /tmp/matrixd.log 2>&1 & +PIDS+=($!) +poll_health 3218 || { echo "matrixd failed"; tail /tmp/matrixd.log; exit 1; } + +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & +PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } + +FAILED=0 +DIM=4 + +# Create two corpora — corpus_a and corpus_b — each with a few +# vectors at known distances from a chosen query vector. +echo "[matrix-smoke] create two corpora:" +for c in corpus_a corpus_b; do + HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/vectors/index \ + -H 'Content-Type: application/json' \ + -d "{\"name\":\"$c\",\"dimension\":$DIM,\"distance\":\"euclidean\"}")" + if [ "$HTTP" != "201" ]; then echo " ✗ create $c → $HTTP"; FAILED=1; fi +done +echo " ✓ corpus_a and corpus_b created" + +# Add vectors. Use euclidean distance for predictable arithmetic. +# Query vector will be [1,0,0,0]. Distances from it: +# corpus_a/a-near : [1.1, 0, 0, 0] ≈ 0.1 +# corpus_a/a-mid : [1, 0.5, 0, 0] ≈ 0.5 +# corpus_a/a-far : [3, 0, 0, 0] ≈ 2.0 +# corpus_b/b-near : [1.05, 0, 0, 0] ≈ 0.05 (closest globally) +# corpus_b/b-mid : [1, 0.7, 0, 0] ≈ 0.7 +# corpus_b/b-far : [4, 0, 0, 0] ≈ 3.0 +echo "[matrix-smoke] add vectors to both corpora:" +curl -sS -o /dev/null -X POST "http://127.0.0.1:3110/v1/vectors/index/corpus_a/add" \ + -H 'Content-Type: application/json' \ + -d '{"items":[ + {"id":"a-near","vector":[1.1,0,0,0],"metadata":{"label":"a near"}}, + {"id":"a-mid","vector":[1,0.5,0,0],"metadata":{"label":"a mid"}}, + {"id":"a-far","vector":[3,0,0,0],"metadata":{"label":"a far"}} + ]}' +curl -sS -o /dev/null -X POST "http://127.0.0.1:3110/v1/vectors/index/corpus_b/add" \ + -H 'Content-Type: application/json' \ + -d '{"items":[ + {"id":"b-near","vector":[1.05,0,0,0],"metadata":{"label":"b near"}}, + {"id":"b-mid","vector":[1,0.7,0,0],"metadata":{"label":"b mid"}}, + {"id":"b-far","vector":[4,0,0,0],"metadata":{"label":"b far"}} + ]}' +echo " ✓ 3 + 3 vectors loaded" + +# ── 1. /matrix/corpora lists both ───────────────────────────────── +echo "[matrix-smoke] /matrix/corpora lists both:" +RESP="$(curl -sS http://127.0.0.1:3110/v1/matrix/corpora)" +COUNT="$(echo "$RESP" | jq -r '.count')" +HAS_A="$(echo "$RESP" | jq -r '.corpora | index("corpus_a") != null')" +HAS_B="$(echo "$RESP" | jq -r '.corpora | index("corpus_b") != null')" +if [ "$COUNT" = "2" ] && [ "$HAS_A" = "true" ] && [ "$HAS_B" = "true" ]; then + echo " ✓ count=2, both corpora listed" +else + echo " ✗ resp: $RESP"; FAILED=1 +fi + +# ── 2. multi-corpus search returns hits from BOTH ───────────────── +echo "[matrix-smoke] /matrix/search multi-corpus retrieve+merge:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/matrix/search \ + -H 'Content-Type: application/json' \ + -d '{"query_vector":[1,0,0,0],"corpora":["corpus_a","corpus_b"],"k":4,"per_corpus_k":3}')" +RESULTS_LEN="$(echo "$RESP" | jq -r '.results | length')" +A_COUNT="$(echo "$RESP" | jq -r '.per_corpus_counts.corpus_a')" +B_COUNT="$(echo "$RESP" | jq -r '.per_corpus_counts.corpus_b')" +HAS_A_RESULT="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="corpus_a")] | length > 0')" +HAS_B_RESULT="$(echo "$RESP" | jq -r '[.results[] | select(.corpus=="corpus_b")] | length > 0')" +if [ "$RESULTS_LEN" = "4" ] && [ "$A_COUNT" = "3" ] && [ "$B_COUNT" = "3" ] && [ "$HAS_A_RESULT" = "true" ] && [ "$HAS_B_RESULT" = "true" ]; then + echo " ✓ 4 merged results · 3+3 per-corpus · both corpora represented" +else + echo " ✗ len=$RESULTS_LEN per_corpus={a:$A_COUNT b:$B_COUNT} a_hit=$HAS_A_RESULT b_hit=$HAS_B_RESULT" + echo " full: $RESP" + FAILED=1 +fi + +# ── 3. distance-merged top-k correct across corpora ─────────────── +echo "[matrix-smoke] top hit comes from corpus_b (b-near is globally closest):" +TOP_ID="$(echo "$RESP" | jq -r '.results[0].id')" +TOP_CORPUS="$(echo "$RESP" | jq -r '.results[0].corpus')" +if [ "$TOP_ID" = "b-near" ] && [ "$TOP_CORPUS" = "corpus_b" ]; then + echo " ✓ top hit: id=b-near corpus=corpus_b (closer than corpus_a's a-near)" +else + echo " ✗ top: id=$TOP_ID corpus=$TOP_CORPUS (expected b-near/corpus_b)" + FAILED=1 +fi + +# ── 4. corpus attribution preserved in metadata ─────────────────── +echo "[matrix-smoke] metadata preserved on merged results:" +TOP_LABEL="$(echo "$RESP" | jq -r '.results[0].metadata.label')" +if [ "$TOP_LABEL" = "b near" ]; then + echo " ✓ metadata.label round-trips through matrix" +else + echo " ✗ label=$TOP_LABEL"; FAILED=1 +fi + +# ── 5. distances ascending in result list ───────────────────────── +echo "[matrix-smoke] results sorted by distance ascending:" +ASCENDING="$(echo "$RESP" | jq -r '[.results[].distance] | . == (sort)')" +if [ "$ASCENDING" = "true" ]; then + echo " ✓ distances ascending" +else + echo " ✗ distances not sorted: $(echo "$RESP" | jq -c '[.results[].distance]')" + FAILED=1 +fi + +# ── 6. negative paths ───────────────────────────────────────────── +echo "[matrix-smoke] empty corpora → 400:" +HTTP_400="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \ + -H 'Content-Type: application/json' \ + -d '{"query_vector":[1,0,0,0],"corpora":[],"k":4}')" +echo "[matrix-smoke] missing corpus name → 502:" +HTTP_502="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \ + -H 'Content-Type: application/json' \ + -d '{"query_vector":[1,0,0,0],"corpora":["does_not_exist"],"k":4}')" +echo "[matrix-smoke] no query (empty text and vector) → 400:" +HTTP_400b="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/matrix/search \ + -H 'Content-Type: application/json' \ + -d '{"corpora":["corpus_a"],"k":4}')" +if [ "$HTTP_400" = "400" ] && [ "$HTTP_502" = "502" ] && [ "$HTTP_400b" = "400" ]; then + echo " ✓ empty=400, missing-corpus=502, no-query=400" +else + echo " ✗ empty=$HTTP_400 missing=$HTTP_502 noquery=$HTTP_400b" + FAILED=1 +fi + +if [ "$FAILED" -eq 0 ]; then + echo "[matrix-smoke] Matrix acceptance gate: PASSED" + exit 0 +else + echo "[matrix-smoke] Matrix acceptance gate: FAILED" + exit 1 +fi