G2: embedd — text → vector via Ollama · 2 scrum fixes
Bridges the missing piece for the staffing co-pilot: text inputs to
vectord-shaped vectors. Standalone cmd/embedd on :3216 fronted by
gateway at /v1/embed. Pluggable embed.Provider interface (G2 ships
Ollama; OpenAI/Voyage swap in via the same interface in G3+).
Wire format:
POST /v1/embed {"texts":[...], "model":"..."} // model optional
→ 200 {"model","dimension","vectors":[[...]]}
Default model: nomic-embed-text (768-d). Ollama returns float64;
provider converts to float32 at the boundary so vectors flow through
vectord/HNSW without re-conversion.
Acceptance smoke 5/5 PASS — including the architectural payoff:
end-to-end embed → vectord add → search by re-embedded text returns
recall=1 at distance 5.96e-8 (float32 precision noise on identical
unit vectors). The staffing co-pilot pipeline (text → vector →
similarity search) is now functional end-to-end.
All 9 smokes (D1-D6 + G1 + G1P + G2) PASS deterministically.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 0 BLOCK + 4 WARN + 3 INFO
- Kimi K2-0905 (openrouter): 0 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): "No BLOCKs" (3 tokens)
Fixed (2 — 1 convergent + 1 single-reviewer):
C1 (Opus + Kimi convergent WARN): per-text 60s timeout × N-text
batch was up to N×60s with no batch-level cap. One stuck Ollama
call would stall the whole handler indefinitely. Fix:
context.WithTimeout(r.Context(), 60s) wraps the entire batch.
O-W3 (Opus WARN): empty strings in texts went to Ollama unchecked,
producing version-dependent garbage. Fix: reject "" with 400 at
the handler boundary so callers get a deterministic answer
instead of an upstream-conditional 502.
Deferred (4): drainAndClose 64KiB cap (matches G0 pattern), no
concurrency limit on /embed (single-tenant G2), missing Accept
header (exotic-proxy concern), MaxBytesError string-match
redundancy (paranoia layer kept consistent across codebase).
Zero false positives this round — Qwen returned 3 tokens "No BLOCKs"
and the other two reviewers' findings were all real.
Setup confirmed: Ollama 0.21.0 on :11434 with nomic-embed-text loaded.
Per-text /api/embeddings used (forward-compat with 0.21+); newer
0.4+ /api/embed batch endpoint can swap in via the Provider interface.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8b92518d21
commit
9ee7fc5550
140
cmd/embedd/main.go
Normal file
140
cmd/embedd/main.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
// embedd is the embedding service. Turns text into vectors via a
|
||||||
|
// pluggable Provider (G2: Ollama at :11434). Vectors flow through
|
||||||
|
// the rest of the stack as float32 — see internal/embed for the
|
||||||
|
// boundary conversion. Default model is config-resolved; callers
|
||||||
|
// can override per request.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"flag"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/embed"
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxRequestBytes = 4 << 20 // 4 MiB cap on /embed body — texts plural
|
||||||
|
batchDeadline = 60 * time.Second // upper bound on a single /embed batch
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
cfg, err := shared.LoadConfig(*configPath)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("config", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
if cfg.Embedd.ProviderURL == "" {
|
||||||
|
slog.Error("config", "err", "embedd.provider_url is required")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
h := &handlers{
|
||||||
|
provider: embed.NewOllama(cfg.Embedd.ProviderURL, cfg.Embedd.DefaultModel),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := shared.Run("embedd", cfg.Embedd.Bind, h.register); err != nil {
|
||||||
|
slog.Error("server", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type handlers struct {
|
||||||
|
provider embed.Provider
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) register(r chi.Router) {
|
||||||
|
r.Post("/embed", h.handleEmbed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// embedRequest is the POST /embed body. Texts is the list to
|
||||||
|
// embed; Model is optional (empty → use server default).
|
||||||
|
type embedRequest struct {
|
||||||
|
Texts []string `json:"texts"`
|
||||||
|
Model string `json:"model,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) handleEmbed(w http.ResponseWriter, r *http.Request) {
|
||||||
|
defer r.Body.Close()
|
||||||
|
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
|
||||||
|
var req embedRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
var maxErr *http.MaxBytesError
|
||||||
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
||||||
|
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Per scrum O-W3 (Opus): reject empty strings up front. Ollama's
|
||||||
|
// behavior on empty prompt is version-dependent (some return
|
||||||
|
// errors, some return zero vectors); rejecting at the boundary
|
||||||
|
// gives callers a deterministic 400 instead of 502.
|
||||||
|
for j, t := range req.Texts {
|
||||||
|
if t == "" {
|
||||||
|
http.Error(w, "texts["+itoa(j)+"]: empty string", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Per scrum C1 (Opus + Kimi convergent): per-text 60s timeout
|
||||||
|
// without a batch-level cap means a 100-text batch with one
|
||||||
|
// stuck call can pin the handler for ~6000s. Set a hard batch
|
||||||
|
// ceiling derived from the request ctx so a wedged Ollama
|
||||||
|
// surfaces as 504-ish (mapped to 502 by the upstream-error
|
||||||
|
// path below) rather than holding the connection forever.
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), batchDeadline)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
res, err := h.provider.Embed(ctx, req.Texts, req.Model)
|
||||||
|
if errors.Is(err, embed.ErrEmptyTexts) {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if errors.Is(err, embed.ErrModelMismatch) {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// Upstream-shape errors (Ollama down, model missing,
|
||||||
|
// 5xx body) bubble up as 502 — distinguishes "your input
|
||||||
|
// was wrong" (400) from "the embedding backend was wrong" (502).
|
||||||
|
slog.Warn("embed", "err", err)
|
||||||
|
http.Error(w, "embed: "+err.Error(), http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
if err := json.NewEncoder(w).Encode(res); err != nil {
|
||||||
|
slog.Warn("embed encode", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// itoa is a tiny helper for error messages without pulling strconv
|
||||||
|
// in just for one call site.
|
||||||
|
func itoa(i int) string {
|
||||||
|
if i == 0 {
|
||||||
|
return "0"
|
||||||
|
}
|
||||||
|
var buf [20]byte
|
||||||
|
pos := len(buf)
|
||||||
|
for i > 0 {
|
||||||
|
pos--
|
||||||
|
buf[pos] = byte('0' + i%10)
|
||||||
|
i /= 10
|
||||||
|
}
|
||||||
|
return string(buf[pos:])
|
||||||
|
}
|
||||||
@ -43,6 +43,7 @@ func main() {
|
|||||||
"ingestd_url": cfg.Gateway.IngestdURL,
|
"ingestd_url": cfg.Gateway.IngestdURL,
|
||||||
"queryd_url": cfg.Gateway.QuerydURL,
|
"queryd_url": cfg.Gateway.QuerydURL,
|
||||||
"vectord_url": cfg.Gateway.VectordURL,
|
"vectord_url": cfg.Gateway.VectordURL,
|
||||||
|
"embedd_url": cfg.Gateway.EmbeddURL,
|
||||||
}
|
}
|
||||||
for k, v := range upstreams {
|
for k, v := range upstreams {
|
||||||
if v == "" {
|
if v == "" {
|
||||||
@ -61,12 +62,14 @@ func main() {
|
|||||||
ingestdURL := mustParseUpstream("ingestd_url", cfg.Gateway.IngestdURL)
|
ingestdURL := mustParseUpstream("ingestd_url", cfg.Gateway.IngestdURL)
|
||||||
querydURL := mustParseUpstream("queryd_url", cfg.Gateway.QuerydURL)
|
querydURL := mustParseUpstream("queryd_url", cfg.Gateway.QuerydURL)
|
||||||
vectordURL := mustParseUpstream("vectord_url", cfg.Gateway.VectordURL)
|
vectordURL := mustParseUpstream("vectord_url", cfg.Gateway.VectordURL)
|
||||||
|
embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL)
|
||||||
|
|
||||||
storagedProxy := gateway.NewProxyHandler(storagedURL)
|
storagedProxy := gateway.NewProxyHandler(storagedURL)
|
||||||
catalogdProxy := gateway.NewProxyHandler(catalogdURL)
|
catalogdProxy := gateway.NewProxyHandler(catalogdURL)
|
||||||
ingestdProxy := gateway.NewProxyHandler(ingestdURL)
|
ingestdProxy := gateway.NewProxyHandler(ingestdURL)
|
||||||
querydProxy := gateway.NewProxyHandler(querydURL)
|
querydProxy := gateway.NewProxyHandler(querydURL)
|
||||||
vectordProxy := gateway.NewProxyHandler(vectordURL)
|
vectordProxy := gateway.NewProxyHandler(vectordURL)
|
||||||
|
embeddProxy := gateway.NewProxyHandler(embeddURL)
|
||||||
|
|
||||||
if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) {
|
if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) {
|
||||||
// Storage / catalog have multi-segment paths under their
|
// Storage / catalog have multi-segment paths under their
|
||||||
@ -82,6 +85,8 @@ func main() {
|
|||||||
r.Handle("/v1/sql", querydProxy)
|
r.Handle("/v1/sql", querydProxy)
|
||||||
// Vector search routes — /v1/vectors/index, /v1/vectors/index/{name}/...
|
// Vector search routes — /v1/vectors/index, /v1/vectors/index/{name}/...
|
||||||
r.Handle("/v1/vectors/*", vectordProxy)
|
r.Handle("/v1/vectors/*", vectordProxy)
|
||||||
|
// Embedding service — /v1/embed
|
||||||
|
r.Handle("/v1/embed", embeddProxy)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
slog.Error("server", "err", err)
|
slog.Error("server", "err", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|||||||
43
internal/embed/embed.go
Normal file
43
internal/embed/embed.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
// Package embed turns text into vectors. G2 ships an Ollama-backed
|
||||||
|
// Provider (local model on :11434, no network call to a hosted
|
||||||
|
// service); the Provider interface keeps the door open for OpenAI /
|
||||||
|
// Voyage / etc. without per-call switching at the HTTP boundary.
|
||||||
|
//
|
||||||
|
// Vectors are float32 throughout the system (matches vectord +
|
||||||
|
// coder/hnsw types). Ollama returns float64; we convert at this
|
||||||
|
// package's boundary so callers don't have to think about it.
|
||||||
|
package embed
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Provider is the embed boundary. Embed takes a list of input
|
||||||
|
// texts and returns the same number of vectors, in input order,
|
||||||
|
// using the named model. Empty model = use the Provider's default.
|
||||||
|
type Provider interface {
|
||||||
|
Embed(ctx context.Context, texts []string, model string) (Result, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result is the per-call response shape. Model echoes which model
|
||||||
|
// was actually used (after default-resolution); Dimension is the
|
||||||
|
// vector dimension reported by the model. The Provider guarantees
|
||||||
|
// every Vector in Vectors has Dimension components.
|
||||||
|
type Result struct {
|
||||||
|
Model string `json:"model"`
|
||||||
|
Dimension int `json:"dimension"`
|
||||||
|
Vectors [][]float32 `json:"vectors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrEmptyTexts is returned when the caller passed no inputs.
|
||||||
|
// Empty-batch is an error because returning an empty result
|
||||||
|
// silently wastes the round trip and hides caller bugs.
|
||||||
|
var ErrEmptyTexts = errors.New("embed: empty texts")
|
||||||
|
|
||||||
|
// ErrModelMismatch is returned when one call to the upstream
|
||||||
|
// returns a different vector dimension than the previous call in
|
||||||
|
// the same batch — only possible if the model actually switched
|
||||||
|
// mid-batch (load swap on the server). Surfaced as a 502 by
|
||||||
|
// callers because the upstream's behavior was inconsistent.
|
||||||
|
var ErrModelMismatch = errors.New("embed: dimension changed mid-batch")
|
||||||
127
internal/embed/ollama.go
Normal file
127
internal/embed/ollama.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
// ollama.go — Provider backed by an Ollama HTTP server. Compatible
|
||||||
|
// with Ollama 0.21+ via the per-text /api/embeddings endpoint.
|
||||||
|
// Newer Ollama (0.4+) exposes /api/embed for batched calls, but
|
||||||
|
// the per-text loop is forward-compatible with both.
|
||||||
|
package embed
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OllamaProvider hits an Ollama server at the configured base URL.
|
||||||
|
type OllamaProvider struct {
|
||||||
|
baseURL string
|
||||||
|
defaultModel string
|
||||||
|
hc *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOllama builds a provider against baseURL (e.g.
|
||||||
|
// "http://localhost:11434"). defaultModel is what gets used when
|
||||||
|
// callers pass an empty model name.
|
||||||
|
func NewOllama(baseURL, defaultModel string) *OllamaProvider {
|
||||||
|
return &OllamaProvider{
|
||||||
|
baseURL: strings.TrimRight(baseURL, "/"),
|
||||||
|
defaultModel: defaultModel,
|
||||||
|
hc: &http.Client{
|
||||||
|
// Embeddings are CPU-bound on the server side; 60s gives
|
||||||
|
// plenty of headroom for a single-text call. Caller can
|
||||||
|
// add an outer ctx deadline for batch-level cap.
|
||||||
|
Timeout: 60 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ollamaRequest is Ollama's /api/embeddings body shape.
|
||||||
|
type ollamaRequest struct {
|
||||||
|
Model string `json:"model"`
|
||||||
|
Prompt string `json:"prompt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ollamaResponse mirrors the success body. Embedding is float64
|
||||||
|
// from Ollama; we convert to float32 at the boundary.
|
||||||
|
type ollamaResponse struct {
|
||||||
|
Embedding []float64 `json:"embedding"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Embed loops over texts, issuing one HTTP call per text. Errors
|
||||||
|
// short-circuit — if call N fails, we return the error and the
|
||||||
|
// caller sees no partial Result.
|
||||||
|
func (p *OllamaProvider) Embed(ctx context.Context, texts []string, model string) (Result, error) {
|
||||||
|
if len(texts) == 0 {
|
||||||
|
return Result{}, ErrEmptyTexts
|
||||||
|
}
|
||||||
|
if model == "" {
|
||||||
|
model = p.defaultModel
|
||||||
|
}
|
||||||
|
if model == "" {
|
||||||
|
return Result{}, fmt.Errorf("embed: no model (empty request, no default)")
|
||||||
|
}
|
||||||
|
|
||||||
|
out := Result{Model: model, Vectors: make([][]float32, 0, len(texts))}
|
||||||
|
for i, text := range texts {
|
||||||
|
vec, err := p.embedOne(ctx, model, text)
|
||||||
|
if err != nil {
|
||||||
|
return Result{}, fmt.Errorf("embed text[%d]: %w", i, err)
|
||||||
|
}
|
||||||
|
// Per-text vectors must agree on dimension. Lock on first.
|
||||||
|
if out.Dimension == 0 {
|
||||||
|
out.Dimension = len(vec)
|
||||||
|
} else if len(vec) != out.Dimension {
|
||||||
|
return Result{}, fmt.Errorf("%w: text[%d] returned %d, prior were %d",
|
||||||
|
ErrModelMismatch, i, len(vec), out.Dimension)
|
||||||
|
}
|
||||||
|
out.Vectors = append(out.Vectors, vec)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *OllamaProvider) embedOne(ctx context.Context, model, text string) ([]float32, error) {
|
||||||
|
body, err := json.Marshal(ollamaRequest{Model: model, Prompt: text})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal: %w", err)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||||
|
p.baseURL+"/api/embeddings", bytes.NewReader(body))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("req: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.ContentLength = int64(len(body))
|
||||||
|
|
||||||
|
resp, err := p.hc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("do: %w", err)
|
||||||
|
}
|
||||||
|
defer drainAndClose(resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
|
||||||
|
return nil, fmt.Errorf("upstream status %d: %s", resp.StatusCode, string(preview))
|
||||||
|
}
|
||||||
|
var or ollamaResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&or); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode: %w", err)
|
||||||
|
}
|
||||||
|
if len(or.Embedding) == 0 {
|
||||||
|
return nil, fmt.Errorf("upstream returned empty embedding")
|
||||||
|
}
|
||||||
|
// Float64 → Float32. Loss of precision is acceptable for HNSW
|
||||||
|
// search; float32 matches the rest of the system.
|
||||||
|
out := make([]float32, len(or.Embedding))
|
||||||
|
for i, v := range or.Embedding {
|
||||||
|
out[i] = float32(v)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func drainAndClose(body io.ReadCloser) {
|
||||||
|
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
|
||||||
|
_ = body.Close()
|
||||||
|
}
|
||||||
115
internal/embed/ollama_test.go
Normal file
115
internal/embed/ollama_test.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
package embed
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOllama_EmbedBatch_PreservesOrder(t *testing.T) {
|
||||||
|
var mu sync.Mutex
|
||||||
|
var seenPrompts []string
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req ollamaRequest
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||||
|
mu.Lock()
|
||||||
|
seenPrompts = append(seenPrompts, req.Prompt)
|
||||||
|
mu.Unlock()
|
||||||
|
// Return a vector that encodes which prompt this was, so
|
||||||
|
// we can assert order at the caller. 4-d vector for cheap.
|
||||||
|
var vec [4]float64
|
||||||
|
switch req.Prompt {
|
||||||
|
case "alpha":
|
||||||
|
vec = [4]float64{1, 0, 0, 0}
|
||||||
|
case "beta":
|
||||||
|
vec = [4]float64{0, 1, 0, 0}
|
||||||
|
case "gamma":
|
||||||
|
vec = [4]float64{0, 0, 1, 0}
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{"embedding": vec[:]})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
p := NewOllama(srv.URL, "test-model")
|
||||||
|
res, err := p.Embed(context.Background(), []string{"alpha", "beta", "gamma"}, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if res.Model != "test-model" || res.Dimension != 4 || len(res.Vectors) != 3 {
|
||||||
|
t.Fatalf("Result: got %+v", res)
|
||||||
|
}
|
||||||
|
if res.Vectors[0][0] != 1 || res.Vectors[1][1] != 1 || res.Vectors[2][2] != 1 {
|
||||||
|
t.Errorf("vectors out of order: %v", res.Vectors)
|
||||||
|
}
|
||||||
|
// Sanity: all three prompts hit the server.
|
||||||
|
if len(seenPrompts) != 3 {
|
||||||
|
t.Errorf("expected 3 upstream calls, got %d", len(seenPrompts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOllama_EmptyTextsErrors(t *testing.T) {
|
||||||
|
p := NewOllama("http://nope:0", "x")
|
||||||
|
_, err := p.Embed(context.Background(), nil, "")
|
||||||
|
if !errors.Is(err, ErrEmptyTexts) {
|
||||||
|
t.Errorf("expected ErrEmptyTexts, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOllama_NoModelNoDefault(t *testing.T) {
|
||||||
|
p := NewOllama("http://nope:0", "") // empty default
|
||||||
|
_, err := p.Embed(context.Background(), []string{"hi"}, "")
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "no model") {
|
||||||
|
t.Errorf("expected no-model error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOllama_UpstreamErrorPropagates(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
http.Error(w, "model not loaded", http.StatusInternalServerError)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
p := NewOllama(srv.URL, "x")
|
||||||
|
_, err := p.Embed(context.Background(), []string{"hi"}, "")
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "upstream status 500") {
|
||||||
|
t.Errorf("expected wrapped 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOllama_DimensionMismatchMidBatch(t *testing.T) {
|
||||||
|
calls := 0
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
calls++
|
||||||
|
// First call returns 4-d, second returns 8-d → server changed
|
||||||
|
// model under us. Provider should ErrModelMismatch.
|
||||||
|
var v []float64
|
||||||
|
if calls == 1 {
|
||||||
|
v = []float64{1, 0, 0, 0}
|
||||||
|
} else {
|
||||||
|
v = []float64{1, 0, 0, 0, 0, 0, 0, 0}
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{"embedding": v})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
p := NewOllama(srv.URL, "x")
|
||||||
|
_, err := p.Embed(context.Background(), []string{"a", "b"}, "")
|
||||||
|
if !errors.Is(err, ErrModelMismatch) {
|
||||||
|
t.Errorf("expected ErrModelMismatch, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOllama_EmptyEmbeddingErrors(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{"embedding": []float64{}})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
p := NewOllama(srv.URL, "x")
|
||||||
|
_, err := p.Embed(context.Background(), []string{"hi"}, "")
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "empty embedding") {
|
||||||
|
t.Errorf("expected empty-embedding error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -25,6 +25,7 @@ type Config struct {
|
|||||||
Ingestd IngestConfig `toml:"ingestd"`
|
Ingestd IngestConfig `toml:"ingestd"`
|
||||||
Queryd QuerydConfig `toml:"queryd"`
|
Queryd QuerydConfig `toml:"queryd"`
|
||||||
Vectord VectordConfig `toml:"vectord"`
|
Vectord VectordConfig `toml:"vectord"`
|
||||||
|
Embedd EmbeddConfig `toml:"embedd"`
|
||||||
S3 S3Config `toml:"s3"`
|
S3 S3Config `toml:"s3"`
|
||||||
Log LogConfig `toml:"log"`
|
Log LogConfig `toml:"log"`
|
||||||
}
|
}
|
||||||
@ -48,9 +49,9 @@ type IngestConfig struct {
|
|||||||
|
|
||||||
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
||||||
// Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql,
|
// Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql,
|
||||||
// /v1/vectors) has its own upstream so we can scale services
|
// /v1/vectors, /v1/embed) has its own upstream so we can scale
|
||||||
// independently or move them to different boxes without touching
|
// services independently or move them to different boxes without
|
||||||
// gateway code.
|
// touching gateway code.
|
||||||
type GatewayConfig struct {
|
type GatewayConfig struct {
|
||||||
Bind string `toml:"bind"`
|
Bind string `toml:"bind"`
|
||||||
StoragedURL string `toml:"storaged_url"`
|
StoragedURL string `toml:"storaged_url"`
|
||||||
@ -58,6 +59,17 @@ type GatewayConfig struct {
|
|||||||
IngestdURL string `toml:"ingestd_url"`
|
IngestdURL string `toml:"ingestd_url"`
|
||||||
QuerydURL string `toml:"queryd_url"`
|
QuerydURL string `toml:"queryd_url"`
|
||||||
VectordURL string `toml:"vectord_url"`
|
VectordURL string `toml:"vectord_url"`
|
||||||
|
EmbeddURL string `toml:"embedd_url"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// EmbeddConfig drives the embed service. ProviderURL points at the
|
||||||
|
// embedding backend (Ollama in G2, possibly OpenAI/Voyage in G3+).
|
||||||
|
// DefaultModel is what gets used when callers don't specify a
|
||||||
|
// model in their request body.
|
||||||
|
type EmbeddConfig struct {
|
||||||
|
Bind string `toml:"bind"`
|
||||||
|
ProviderURL string `toml:"provider_url"`
|
||||||
|
DefaultModel string `toml:"default_model"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// VectordConfig adds vectord-specific knobs. StoragedURL is
|
// VectordConfig adds vectord-specific knobs. StoragedURL is
|
||||||
@ -123,6 +135,7 @@ func DefaultConfig() Config {
|
|||||||
IngestdURL: "http://127.0.0.1:3213",
|
IngestdURL: "http://127.0.0.1:3213",
|
||||||
QuerydURL: "http://127.0.0.1:3214",
|
QuerydURL: "http://127.0.0.1:3214",
|
||||||
VectordURL: "http://127.0.0.1:3215",
|
VectordURL: "http://127.0.0.1:3215",
|
||||||
|
EmbeddURL: "http://127.0.0.1:3216",
|
||||||
},
|
},
|
||||||
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
||||||
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
||||||
@ -136,6 +149,11 @@ func DefaultConfig() Config {
|
|||||||
Bind: "127.0.0.1:3215",
|
Bind: "127.0.0.1:3215",
|
||||||
StoragedURL: "http://127.0.0.1:3211",
|
StoragedURL: "http://127.0.0.1:3211",
|
||||||
},
|
},
|
||||||
|
Embedd: EmbeddConfig{
|
||||||
|
Bind: "127.0.0.1:3216",
|
||||||
|
ProviderURL: "http://localhost:11434", // local Ollama
|
||||||
|
DefaultModel: "nomic-embed-text",
|
||||||
|
},
|
||||||
Queryd: QuerydConfig{
|
Queryd: QuerydConfig{
|
||||||
Bind: "127.0.0.1:3214",
|
Bind: "127.0.0.1:3214",
|
||||||
CatalogdURL: "http://127.0.0.1:3212",
|
CatalogdURL: "http://127.0.0.1:3212",
|
||||||
|
|||||||
@ -11,6 +11,7 @@ catalogd_url = "http://127.0.0.1:3212"
|
|||||||
ingestd_url = "http://127.0.0.1:3213"
|
ingestd_url = "http://127.0.0.1:3213"
|
||||||
queryd_url = "http://127.0.0.1:3214"
|
queryd_url = "http://127.0.0.1:3214"
|
||||||
vectord_url = "http://127.0.0.1:3215"
|
vectord_url = "http://127.0.0.1:3215"
|
||||||
|
embedd_url = "http://127.0.0.1:3216"
|
||||||
|
|
||||||
[storaged]
|
[storaged]
|
||||||
bind = "127.0.0.1:3211"
|
bind = "127.0.0.1:3211"
|
||||||
@ -33,6 +34,13 @@ bind = "127.0.0.1:3215"
|
|||||||
# Optional — set to empty string to disable persistence (dev/test).
|
# Optional — set to empty string to disable persistence (dev/test).
|
||||||
storaged_url = "http://127.0.0.1:3211"
|
storaged_url = "http://127.0.0.1:3211"
|
||||||
|
|
||||||
|
[embedd]
|
||||||
|
bind = "127.0.0.1:3216"
|
||||||
|
# G2: Ollama local. G3+ may swap in OpenAI/Voyage by changing
|
||||||
|
# this URL + the wire format inside the provider.
|
||||||
|
provider_url = "http://localhost:11434"
|
||||||
|
default_model = "nomic-embed-text"
|
||||||
|
|
||||||
[queryd]
|
[queryd]
|
||||||
bind = "127.0.0.1:3214"
|
bind = "127.0.0.1:3214"
|
||||||
catalogd_url = "http://127.0.0.1:3212"
|
catalogd_url = "http://127.0.0.1:3212"
|
||||||
|
|||||||
165
scripts/g2_smoke.sh
Executable file
165
scripts/g2_smoke.sh
Executable file
@ -0,0 +1,165 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# G2 smoke — embedd service. All assertions go through gateway :3110.
|
||||||
|
#
|
||||||
|
# Validates:
|
||||||
|
# - POST /v1/embed with 2 texts → 200, dim=768 (nomic-embed-text),
|
||||||
|
# vectors[0] != vectors[1] (different texts → different vectors)
|
||||||
|
# - Same text twice → byte-identical vectors (deterministic)
|
||||||
|
# - Empty texts → 400
|
||||||
|
# - Bad model → 502 from upstream Ollama
|
||||||
|
# - End-to-end with vectord: embed text → store → search by text →
|
||||||
|
# same text round-trips at distance ≈ 0 (proves embed→vectord
|
||||||
|
# pipeline works)
|
||||||
|
#
|
||||||
|
# Requires Ollama running at :11434 with nomic-embed-text loaded.
|
||||||
|
#
|
||||||
|
# Usage: ./scripts/g2_smoke.sh
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
cd "$(dirname "$0")/.."
|
||||||
|
|
||||||
|
export PATH="$PATH:/usr/local/go/bin"
|
||||||
|
|
||||||
|
echo "[g2-smoke] building embedd + vectord + gateway..."
|
||||||
|
go build -o bin/ ./cmd/embedd ./cmd/vectord ./cmd/gateway
|
||||||
|
|
||||||
|
pkill -f "bin/(embedd|vectord|gateway)" 2>/dev/null || true
|
||||||
|
sleep 0.3
|
||||||
|
|
||||||
|
PIDS=()
|
||||||
|
TMP="$(mktemp -d)"
|
||||||
|
cleanup() {
|
||||||
|
echo "[g2-smoke] cleanup"
|
||||||
|
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
|
||||||
|
rm -rf "$TMP"
|
||||||
|
}
|
||||||
|
trap cleanup EXIT INT TERM
|
||||||
|
|
||||||
|
poll_health() {
|
||||||
|
local port="$1" deadline=$(($(date +%s) + 5))
|
||||||
|
while [ "$(date +%s)" -lt "$deadline" ]; do
|
||||||
|
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
|
||||||
|
sleep 0.05
|
||||||
|
done
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# Verify Ollama is up before the test even starts — otherwise every
|
||||||
|
# embed call would 502 and the smoke would be misleading.
|
||||||
|
if ! curl -sS --max-time 3 http://localhost:11434/api/tags >/dev/null 2>&1; then
|
||||||
|
echo "[g2-smoke] Ollama not reachable on :11434 — skipping"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "[g2-smoke] launching embedd → vectord (no persist) → gateway..."
|
||||||
|
./bin/embedd > /tmp/embedd.log 2>&1 &
|
||||||
|
PIDS+=($!)
|
||||||
|
poll_health 3216 || { echo "embedd failed"; tail /tmp/embedd.log; exit 1; }
|
||||||
|
|
||||||
|
# vectord with persistence disabled (matches g1_smoke pattern —
|
||||||
|
# this smoke doesn't touch storaged).
|
||||||
|
./bin/vectord -config scripts/g1_smoke.toml > /tmp/vectord.log 2>&1 &
|
||||||
|
PIDS+=($!)
|
||||||
|
poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; }
|
||||||
|
|
||||||
|
./bin/gateway > /tmp/gateway.log 2>&1 &
|
||||||
|
PIDS+=($!)
|
||||||
|
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
|
||||||
|
|
||||||
|
FAILED=0
|
||||||
|
|
||||||
|
echo "[g2-smoke] /v1/embed — two distinct texts:"
|
||||||
|
RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d '{"texts":["forklift operator with OSHA-30","CNC machinist precision parts"]}')"
|
||||||
|
DIM="$(echo "$RESP" | jq -r '.dimension')"
|
||||||
|
N="$(echo "$RESP" | jq -r '.vectors | length')"
|
||||||
|
MODEL="$(echo "$RESP" | jq -r '.model')"
|
||||||
|
SAME="$(echo "$RESP" | jq -r '.vectors[0][0] == .vectors[1][0]')"
|
||||||
|
if [ "$DIM" = "768" ] && [ "$N" = "2" ] && [ "$MODEL" = "nomic-embed-text" ] && [ "$SAME" = "false" ]; then
|
||||||
|
echo " ✓ dim=768, model=nomic-embed-text, 2 distinct vectors"
|
||||||
|
else
|
||||||
|
echo " ✗ resp: dim=$DIM n=$N model=$MODEL same=$SAME"; FAILED=1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "[g2-smoke] determinism — same text twice → byte-identical vector:"
|
||||||
|
RESP1="$(curl -sS -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d '{"texts":["determinism check"]}' | jq -c '.vectors[0]')"
|
||||||
|
RESP2="$(curl -sS -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d '{"texts":["determinism check"]}' | jq -c '.vectors[0]')"
|
||||||
|
if [ "$RESP1" = "$RESP2" ]; then
|
||||||
|
echo " ✓ identical text → identical vector"
|
||||||
|
else
|
||||||
|
echo " ✗ deterministic mismatch"; FAILED=1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "[g2-smoke] empty texts → 400:"
|
||||||
|
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' -d '{"texts":[]}')"
|
||||||
|
if [ "$HTTP" = "400" ]; then echo " ✓ empty → 400"; else echo " ✗ empty → $HTTP"; FAILED=1; fi
|
||||||
|
|
||||||
|
echo "[g2-smoke] bad model → 502:"
|
||||||
|
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' -d '{"texts":["x"],"model":"definitely-not-loaded"}')"
|
||||||
|
if [ "$HTTP" = "502" ]; then echo " ✓ unknown model → 502"; else echo " ✗ unknown → $HTTP"; FAILED=1; fi
|
||||||
|
|
||||||
|
echo "[g2-smoke] end-to-end: embed → vectord add → search by embed → recall:"
|
||||||
|
NAME="g2_demo"
|
||||||
|
# Create index. Default M/EfSearch; cosine distance.
|
||||||
|
curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/vectors/index \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d "{\"name\":\"$NAME\",\"dimension\":768,\"distance\":\"cosine\"}"
|
||||||
|
|
||||||
|
# Embed a few staffing-ish texts and add them.
|
||||||
|
TEXTS='["forklift operator with OSHA-30","CNC machinist precision parts","warehouse picker night shift","dental hygienist 3 years experience"]'
|
||||||
|
EMBEDS="$(curl -sS -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d "{\"texts\":$TEXTS}")"
|
||||||
|
# Build the add payload — id-i + vector from embeds[i].
|
||||||
|
python3 - "$EMBEDS" <<'EOF' > "$TMP/add.json"
|
||||||
|
import json, sys
|
||||||
|
embeds = json.loads(sys.argv[1])
|
||||||
|
items = [
|
||||||
|
{"id": f"w-{i}", "vector": v, "metadata": {"text": t}}
|
||||||
|
for i, (v, t) in enumerate(zip(embeds["vectors"], [
|
||||||
|
"forklift operator with OSHA-30",
|
||||||
|
"CNC machinist precision parts",
|
||||||
|
"warehouse picker night shift",
|
||||||
|
"dental hygienist 3 years experience",
|
||||||
|
]))
|
||||||
|
]
|
||||||
|
print(json.dumps({"items": items}))
|
||||||
|
EOF
|
||||||
|
curl -sS -o /dev/null -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/add" \
|
||||||
|
-H 'Content-Type: application/json' -d @"$TMP/add.json"
|
||||||
|
|
||||||
|
# Search by embedding the FIRST text again — should retrieve w-0 at dist≈0
|
||||||
|
QUERY_VEC="$(curl -sS -X POST http://127.0.0.1:3110/v1/embed \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d '{"texts":["forklift operator with OSHA-30"]}' | jq -c '.vectors[0]')"
|
||||||
|
SEARCH="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d "{\"vector\":$QUERY_VEC,\"k\":3}")"
|
||||||
|
TOP_ID="$(echo "$SEARCH" | jq -r '.results[0].id')"
|
||||||
|
TOP_DIST="$(echo "$SEARCH" | jq -r '.results[0].distance')"
|
||||||
|
DIST_OK="$(python3 -c "import sys; sys.exit(0 if abs($TOP_DIST) < 1e-4 else 1)" && echo y || echo n)"
|
||||||
|
if [ "$TOP_ID" = "w-0" ] && [ "$DIST_OK" = "y" ]; then
|
||||||
|
echo " ✓ embed → store → search round-trip: w-0 at dist=$TOP_DIST"
|
||||||
|
else
|
||||||
|
echo " ✗ recall: top=$TOP_ID dist=$TOP_DIST"
|
||||||
|
echo " full: $SEARCH"
|
||||||
|
FAILED=1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Clean up the index.
|
||||||
|
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3110/v1/vectors/index/$NAME" || true
|
||||||
|
|
||||||
|
if [ "$FAILED" -eq 0 ]; then
|
||||||
|
echo "[g2-smoke] G2 acceptance gate: PASSED"
|
||||||
|
exit 0
|
||||||
|
else
|
||||||
|
echo "[g2-smoke] G2 acceptance gate: FAILED"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
Loading…
x
Reference in New Issue
Block a user