G1P: vectord persistence to storaged + scrum (3 fixes incl. 3-way convergent)
Adds optional persistence to vectord (G1's HNSW vector search). Single-
file framed format per index — eliminates the torn-write class that
the 3-way convergent scrum finding identified:
_vectors/<name>.lhv1 — single binary blob:
[4 bytes magic "LHV1"]
[4 bytes envelope_len uint32 BE]
[envelope bytes — JSON params + metadata + version]
[graph bytes — raw hnsw.Graph.Export]
Pre-extraction: internal/catalogd/store_client.go → internal/storeclient/
shared package, since both catalogd and vectord need it. Same pattern as
the pre-D5 catalogclient extraction.
Optional via [vectord].storaged_url config (empty = ephemeral mode).
On startup: List + Load each persisted index. After Create / batch Add /
DELETE: Save (or Delete from storaged). Save failures are logged-not-
fatal — in-memory state is the source of truth in flight.
Acceptance smoke G1P 8/8 PASS — kill+restart preserves state, post-
restart search returns dist=0 (graph round-trips exactly), DELETE
removes the file, post-delete restart shows count=0.
All 8 smokes (D1-D6 + G1 + G1P) PASS deterministically. The g1_smoke
gained scripts/g1_smoke.toml that disables persistence so the
in-memory API test stays decoupled from any rehydrate-from-storaged
state contamination.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 5 WARN + 3 INFO
- Kimi K2-0905 (openrouter): 1 BLOCK + 2 WARN
- Qwen3-coder (openrouter): 2 BLOCK + 2 WARN + 1 INFO
Fixed (3 — 1 convergent + 2 single-reviewer):
C1 (Opus + Kimi + Qwen 3-WAY CONVERGENT WARN): Save was non-atomic
across two PUTs — envelope-succeeds + graph-fails left a half-
saved index that passed the "both present" List filter and
silently mismatched metadata against vectors on Load.
Fix: collapse to single framed file (no torn-write window
possible).
O-B1 (Opus BLOCK): isNotFound substring-matched "key not found"
against the wrapped error message — brittle, any 5xx body
containing that text would silently misclassify as missing.
Fix: errors.Is(err, storeclient.ErrKeyNotFound).
O-I3 (Opus INFO): handleAdd pre-validation only covered id+dim;
NaN/Inf/zero-norm could still fail mid-batch leaving partial
commits. Fix: extend pre-validation to call ValidateVector
(newly exported) per item before any commit.
Dismissed (3 false positives):
K-B1 + Q-B1 ("safeKey double-escapes %2F segments") — false
convergent. Wire-protocol escape is decoded by storaged's chi
router on the way in; on-disk key is the original literal.
%2F round-trips correctly through PathEscape → URL → chi decode
→ S3 key.
Q-B2 ("List vulnerable to race conditions") — vectord is single-
process; no concurrent Save against List in the same vectord.
Deferred (3): rehydrate per-index timeout (G2+ multi-index scale),
saveAfter request ctx (matches G0 timeout deferral), Encode RLock
during slow writer (documented as buffer-only API).
The C1 finding is the strongest signal of the cross-lineage filter:
three independent reviewers all flagged the same torn-write hazard.
Single-file framing eliminates the class — there's now no Persistor
state where envelope and graph can disagree.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b8c072cf0b
commit
8b92518d21
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -34,7 +35,7 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL)
|
||||
store := storeclient.New(cfg.Catalogd.StoragedURL)
|
||||
registry := catalogd.NewRegistry(store)
|
||||
|
||||
rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
|
||||
@ -1,15 +1,14 @@
|
||||
// vectord is the vector-search service. In-memory HNSW indexes
|
||||
// keyed by string IDs with optional opaque JSON metadata. Wraps
|
||||
// vectord is the vector-search service. HNSW indexes keyed by
|
||||
// string IDs with optional opaque JSON metadata. Wraps
|
||||
// github.com/coder/hnsw (pure Go, no cgo).
|
||||
//
|
||||
// G1 scope: in-memory only, single-process. Persistence to storaged
|
||||
// + rehydrate-on-restart is the next piece. The HTTP surface is
|
||||
// stable enough to build the staffing co-pilot's "find workers
|
||||
// like X" path on top right now — the indexes just have to be
|
||||
// rebuilt after a restart.
|
||||
// G1 + persistence: indexes are persisted to storaged at
|
||||
// _vectors/<name>.{json,hnsw} and rehydrated on startup. Setting
|
||||
// [vectord].storaged_url empty disables persistence (dev mode).
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
@ -18,10 +17,12 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord"
|
||||
)
|
||||
|
||||
@ -43,6 +44,24 @@ func main() {
|
||||
|
||||
h := &handlers{reg: vectord.NewRegistry()}
|
||||
|
||||
// Persistence is optional — empty StoragedURL = dev/ephemeral mode.
|
||||
if cfg.Vectord.StoragedURL != "" {
|
||||
h.persist = vectord.NewPersistor(storeclient.New(cfg.Vectord.StoragedURL))
|
||||
|
||||
// Rehydrate any persisted indexes at startup. Failures are
|
||||
// logged-not-fatal: storaged might be coming up after vectord,
|
||||
// and an index that failed to load is still recoverable by
|
||||
// re-ingesting on top of an empty registry.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
n, err := h.rehydrate(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Warn("rehydrate", "err", err, "loaded", n)
|
||||
} else {
|
||||
slog.Info("rehydrated", "indexes", n)
|
||||
}
|
||||
}
|
||||
|
||||
if err := shared.Run("vectord", cfg.Vectord.Bind, h.register); err != nil {
|
||||
slog.Error("server", "err", err)
|
||||
os.Exit(1)
|
||||
@ -50,7 +69,65 @@ func main() {
|
||||
}
|
||||
|
||||
type handlers struct {
|
||||
reg *vectord.Registry
|
||||
reg *vectord.Registry
|
||||
persist *vectord.Persistor // nil when persistence is disabled
|
||||
}
|
||||
|
||||
// rehydrate enumerates persisted indexes and loads each into the
|
||||
// registry. Returns the count of successfully loaded indexes plus
|
||||
// the first error (if any) — caller decides fatality.
|
||||
func (h *handlers) rehydrate(ctx context.Context) (int, error) {
|
||||
if h.persist == nil {
|
||||
return 0, nil
|
||||
}
|
||||
names, err := h.persist.List(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
loaded := 0
|
||||
for _, name := range names {
|
||||
idx, err := h.persist.Load(ctx, name)
|
||||
if err != nil {
|
||||
slog.Warn("rehydrate skip", "name", name, "err", err)
|
||||
continue
|
||||
}
|
||||
// The registry's Create rebuilds an empty Index from params;
|
||||
// we want the LOADED one (with vectors). Bypass via a
|
||||
// helper that registers a pre-built Index.
|
||||
if err := h.reg.RegisterPrebuilt(idx); err != nil {
|
||||
slog.Warn("rehydrate register", "name", name, "err", err)
|
||||
continue
|
||||
}
|
||||
loaded++
|
||||
}
|
||||
return loaded, nil
|
||||
}
|
||||
|
||||
// saveAfter is the post-write persistence hook. Logs-not-fatal:
|
||||
// in-memory state is the source of truth in flight; a failed save
|
||||
// gets re-attempted on the next mutation, and the operator log
|
||||
// shows the storaged outage.
|
||||
func (h *handlers) saveAfter(idx *vectord.Index) {
|
||||
if h.persist == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := h.persist.Save(ctx, idx); err != nil {
|
||||
slog.Warn("persist save", "name", idx.Params().Name, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// deleteAfter mirrors saveAfter for the Delete path.
|
||||
func (h *handlers) deleteAfter(name string) {
|
||||
if h.persist == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := h.persist.Delete(ctx, name); err != nil {
|
||||
slog.Warn("persist delete", "name", name, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handlers) register(r chi.Router) {
|
||||
@ -125,6 +202,7 @@ func (h *handlers) handleCreate(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "internal", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
h.saveAfter(idx)
|
||||
writeJSON(w, http.StatusCreated, indexInfo{Params: idx.Params(), Length: idx.Len()})
|
||||
}
|
||||
|
||||
@ -156,6 +234,7 @@ func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "internal", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
h.deleteAfter(name)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
@ -174,12 +253,13 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "items must be non-empty", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// Per scrum O-W4 (Opus): pre-validate all items before any Add,
|
||||
// so a bad item at position N doesn't leave items 0..N-1 already
|
||||
// committed and item N rejected. Both checks (empty id, dim
|
||||
// mismatch) are local; running them up-front is O(N) extra work
|
||||
// that the success path already paid in idx.Add.
|
||||
dim := idx.Params().Dimension
|
||||
// Per scrum O-W4 (Opus, D5): pre-validate all items before any
|
||||
// Add, so a bad item at position N doesn't leave items 0..N-1
|
||||
// committed and item N rejected. Per scrum O-I3 (Opus, G1P):
|
||||
// extend pre-validation to cover NaN/Inf and zero-norm — these
|
||||
// were caught inside idx.Add but only after partial commits.
|
||||
params := idx.Params()
|
||||
dim := params.Dimension
|
||||
for j, it := range req.Items {
|
||||
if it.ID == "" {
|
||||
http.Error(w, "items["+strconv.Itoa(j)+"]: empty id", http.StatusBadRequest)
|
||||
@ -189,6 +269,10 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "items["+strconv.Itoa(j)+"]: dim mismatch (index="+strconv.Itoa(dim)+", got="+strconv.Itoa(len(it.Vector))+")", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if err := vectord.ValidateVector(it.Vector, params.Distance); err != nil {
|
||||
http.Error(w, "items["+strconv.Itoa(j)+"]: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
for j, it := range req.Items {
|
||||
if err := idx.Add(it.ID, it.Vector, it.Metadata); err != nil {
|
||||
@ -206,6 +290,9 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// One save per batch (post-loop), not per item. Per scrum
|
||||
// O-W4-style discipline: HTTP-batch boundary is the natural unit.
|
||||
h.saveAfter(idx)
|
||||
writeJSON(w, http.StatusOK, map[string]any{"added": len(req.Items), "length": idx.Len()})
|
||||
}
|
||||
|
||||
|
||||
@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient"
|
||||
)
|
||||
|
||||
// memStore is an in-memory Store fake for unit tests.
|
||||
@ -30,7 +32,7 @@ func (m *memStore) Get(_ context.Context, key string) ([]byte, error) {
|
||||
defer m.mu.Unlock()
|
||||
b, ok := m.data[key]
|
||||
if !ok {
|
||||
return nil, ErrKeyNotFound
|
||||
return nil, storeclient.ErrKeyNotFound
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
@ -60,10 +60,13 @@ type GatewayConfig struct {
|
||||
VectordURL string `toml:"vectord_url"`
|
||||
}
|
||||
|
||||
// VectordConfig adds vectord-specific knobs. Currently just bind;
|
||||
// G1+ will add persistence-to-storaged URL + index params defaults.
|
||||
// VectordConfig adds vectord-specific knobs. StoragedURL is
|
||||
// optional — empty string disables persistence, useful for ephemeral
|
||||
// dev or test runs. When set, indexes Save after every state change
|
||||
// and Load on startup.
|
||||
type VectordConfig struct {
|
||||
Bind string `toml:"bind"`
|
||||
Bind string `toml:"bind"`
|
||||
StoragedURL string `toml:"storaged_url"`
|
||||
}
|
||||
|
||||
// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB
|
||||
@ -129,7 +132,10 @@ func DefaultConfig() Config {
|
||||
CatalogdURL: "http://127.0.0.1:3212",
|
||||
MaxIngestBytes: 256 << 20, // 256 MiB; bump per deployment via lakehouse.toml
|
||||
},
|
||||
Vectord: VectordConfig{Bind: "127.0.0.1:3215"},
|
||||
Vectord: VectordConfig{
|
||||
Bind: "127.0.0.1:3215",
|
||||
StoragedURL: "http://127.0.0.1:3211",
|
||||
},
|
||||
Queryd: QuerydConfig{
|
||||
Bind: "127.0.0.1:3214",
|
||||
CatalogdURL: "http://127.0.0.1:3212",
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
// store_client.go — thin HTTP client to storaged. catalogd needs to
|
||||
// PUT manifest Parquets, GET them on startup, and LIST the manifests
|
||||
// directory. Staying inside an HTTP boundary (rather than reaching
|
||||
// into storaged's package directly) preserves the service-boundary
|
||||
// shape that gRPC will eventually formalize at G1+.
|
||||
package catalogd
|
||||
// Package storeclient is the shared HTTP client to storaged's
|
||||
// /storage/* surface. catalogd uses Get/Put/List for manifest
|
||||
// persistence; vectord (G1+) uses the same shape for HNSW index
|
||||
// persistence. Extracting it here keeps the dep direction clean —
|
||||
// a service that talks to storaged depends on this package, not
|
||||
// on another service's package.
|
||||
package storeclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@ -17,15 +18,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// StoreClient talks HTTP to the storaged service.
|
||||
type StoreClient struct {
|
||||
// ErrKeyNotFound mirrors storaged's not-found semantics on the
|
||||
// caller side without exposing storaged's package types.
|
||||
var ErrKeyNotFound = fmt.Errorf("storeclient: key not found")
|
||||
|
||||
// Client talks HTTP to a storaged service.
|
||||
type Client struct {
|
||||
baseURL string
|
||||
hc *http.Client
|
||||
}
|
||||
|
||||
// listResponse mirrors storaged's GET /storage/list shape:
|
||||
//
|
||||
// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]}
|
||||
// {"prefix":"_catalog/manifests/","objects":[{Key,Size,...}, ...]}
|
||||
type listResponse struct {
|
||||
Prefix string `json:"prefix"`
|
||||
Objects []struct {
|
||||
@ -34,19 +39,18 @@ type listResponse struct {
|
||||
} `json:"objects"`
|
||||
}
|
||||
|
||||
// NewStoreClient builds a client against the given storaged base URL
|
||||
// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write
|
||||
// only; rehydration of many manifests at startup runs sequentially
|
||||
// and each call gets its own timeout window.
|
||||
func NewStoreClient(baseURL string) *StoreClient {
|
||||
return &StoreClient{
|
||||
// New builds a client against the given storaged base URL
|
||||
// (e.g. "http://127.0.0.1:3211"). Timeout covers a single op only;
|
||||
// callers that drive many ops sequentially get a fresh window per call.
|
||||
func New(baseURL string) *Client {
|
||||
return &Client{
|
||||
baseURL: strings.TrimRight(baseURL, "/"),
|
||||
hc: &http.Client{Timeout: 30 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Put writes raw bytes at key. body is the encoded Parquet manifest.
|
||||
func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error {
|
||||
// Put writes raw bytes at key.
|
||||
func (c *Client) Put(ctx context.Context, key string, body []byte) error {
|
||||
u := c.baseURL + "/storage/put/" + safeKey(key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
@ -65,8 +69,8 @@ func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get reads the bytes at key. ErrKeyNotFound on 404.
|
||||
func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
// Get reads the bytes at key. Returns ErrKeyNotFound on 404.
|
||||
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
u := c.baseURL + "/storage/get/" + safeKey(key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
@ -87,9 +91,29 @@ func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
// List returns the keys under prefix. Object metadata beyond Key is
|
||||
// ignored — catalogd only needs the keys to drive rehydration.
|
||||
func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
// Delete removes the key. Idempotent — a missing key is not an
|
||||
// error (matches storaged's underlying S3 DeleteObject semantics).
|
||||
func (c *Client) Delete(ctx context.Context, key string) error {
|
||||
u := c.baseURL + "/storage/delete/" + safeKey(key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete req: %w", err)
|
||||
}
|
||||
resp, err := c.hc.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete do: %w", err)
|
||||
}
|
||||
defer drainAndClose(resp.Body)
|
||||
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
|
||||
return fmt.Errorf("delete %s: status %d: %s", key, resp.StatusCode, string(preview))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns the keys under prefix. Caller-side filtering on
|
||||
// suffix or other shape lives outside this package.
|
||||
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
@ -115,10 +139,6 @@ func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd
|
||||
// side without exposing storaged's package types.
|
||||
var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found")
|
||||
|
||||
// safeKey URL-escapes path segments while preserving "/". storaged's
|
||||
// chi `/storage/<verb>/*` routes accept literal slashes in the
|
||||
// wildcard match, so we only escape the segments, not the separators.
|
||||
@ -131,9 +151,8 @@ func safeKey(key string) string {
|
||||
}
|
||||
|
||||
// drainAndClose reads any remaining body bytes (capped at 64 KiB) and
|
||||
// closes the body. Per scrum S6 (Qwen): preview-then-close on error
|
||||
// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep-
|
||||
// alive reuse and slowly leaking sockets.
|
||||
// closes the body — keeps HTTP/1.1 keep-alive pool reuse healthy on
|
||||
// error paths.
|
||||
func drainAndClose(body io.ReadCloser) {
|
||||
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
|
||||
_ = body.Close()
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
@ -200,8 +201,15 @@ func (i *Index) resetGraphLocked() {
|
||||
i.g = g
|
||||
}
|
||||
|
||||
// ValidateVector is the exported form of validateVector — the HTTP
|
||||
// handler pre-validates batches before committing, so it needs the
|
||||
// same predicate Add uses internally. Per scrum O-I3 (G1P).
|
||||
func ValidateVector(vec []float32, distance string) error {
|
||||
return validateVector(vec, distance)
|
||||
}
|
||||
|
||||
// validateVector rejects vectors that would poison the HNSW
|
||||
// graph or produce NaN distances. Per scrum O-W3 (Opus).
|
||||
// graph or produce NaN distances. Per scrum O-W3 (Opus, G1).
|
||||
func validateVector(vec []float32, distance string) error {
|
||||
var sumSq float64
|
||||
for j, v := range vec {
|
||||
@ -259,6 +267,75 @@ func (i *Index) Search(query []float32, k int) ([]Result, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// IndexEnvelope is the JSON shape persisted alongside the binary
|
||||
// HNSW graph bytes. params + metadata + format version travel
|
||||
// together; the graph itself is opaque binary that round-trips
|
||||
// through hnsw.Graph.Export / Import.
|
||||
type IndexEnvelope struct {
|
||||
Version int `json:"version"`
|
||||
Params IndexParams `json:"params"`
|
||||
Metadata map[string]json.RawMessage `json:"metadata"`
|
||||
}
|
||||
|
||||
// envelopeVersion bumps when the on-disk JSON shape changes
|
||||
// incompatibly. Reading a future version returns ErrVersionMismatch
|
||||
// rather than producing a half-decoded index.
|
||||
const envelopeVersion = 1
|
||||
|
||||
// ErrVersionMismatch is returned by DecodeIndex when the envelope
|
||||
// claims a version this build doesn't understand.
|
||||
var ErrVersionMismatch = errors.New("vectord: unknown envelope version")
|
||||
|
||||
// Encode writes the index's JSON envelope (params + metadata) and
|
||||
// the binary HNSW graph bytes through two writers. Two-stream
|
||||
// shape lets the persistor write each to a distinct storaged key
|
||||
// without reframing.
|
||||
//
|
||||
// envelopeW receives params+metadata as JSON; graphW receives the
|
||||
// raw output of hnsw.Graph.Export.
|
||||
func (i *Index) Encode(envelopeW, graphW io.Writer) error {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
env := IndexEnvelope{
|
||||
Version: envelopeVersion,
|
||||
Params: i.params,
|
||||
Metadata: i.meta,
|
||||
}
|
||||
if err := json.NewEncoder(envelopeW).Encode(env); err != nil {
|
||||
return fmt.Errorf("encode envelope: %w", err)
|
||||
}
|
||||
if err := i.g.Export(graphW); err != nil {
|
||||
return fmt.Errorf("export graph: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DecodeIndex reconstructs an Index from a previously-Encoded pair
|
||||
// of streams. The returned Index is independent — closing either
|
||||
// reader after this call doesn't affect the result.
|
||||
func DecodeIndex(envelopeR, graphR io.Reader) (*Index, error) {
|
||||
var env IndexEnvelope
|
||||
if err := json.NewDecoder(envelopeR).Decode(&env); err != nil {
|
||||
return nil, fmt.Errorf("decode envelope: %w", err)
|
||||
}
|
||||
if env.Version != envelopeVersion {
|
||||
return nil, fmt.Errorf("%w: have %d, got %d",
|
||||
ErrVersionMismatch, envelopeVersion, env.Version)
|
||||
}
|
||||
idx, err := NewIndex(env.Params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := idx.g.Import(graphR); err != nil {
|
||||
return nil, fmt.Errorf("import graph: %w", err)
|
||||
}
|
||||
if env.Metadata != nil {
|
||||
idx.meta = env.Metadata
|
||||
}
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// Lookup returns the stored vector + metadata for an id.
|
||||
//
|
||||
// Per scrum O-W1 (Opus): the vector is COPIED before return.
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package vectord
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -129,6 +130,65 @@ func TestIndex_ConcurrentSearchAdd(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestEncodeDecode_RoundTrip(t *testing.T) {
|
||||
const n = 16
|
||||
src, _ := NewIndex(IndexParams{Name: "x", Dimension: n, Distance: DistanceCosine})
|
||||
mkVec := func(i int) []float32 {
|
||||
// Each vector is a unique unit vector along axis (i mod n) with
|
||||
// a tiny perturbation on a different axis — recall=1 is robust
|
||||
// without colliding under cosine.
|
||||
v := make([]float32, n)
|
||||
v[i%n] = 1.0
|
||||
v[(i+1)%n] = 0.001
|
||||
return v
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
meta := json.RawMessage(fmt.Sprintf(`{"row":%d}`, i))
|
||||
if err := src.Add(fmt.Sprintf("id-%02d", i), mkVec(i), meta); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var envBuf, graphBuf bytes.Buffer
|
||||
if err := src.Encode(&envBuf, &graphBuf); err != nil {
|
||||
t.Fatalf("Encode: %v", err)
|
||||
}
|
||||
|
||||
dst, err := DecodeIndex(&envBuf, &graphBuf)
|
||||
if err != nil {
|
||||
t.Fatalf("DecodeIndex: %v", err)
|
||||
}
|
||||
if dst.Len() != src.Len() {
|
||||
t.Errorf("Len: src=%d dst=%d", src.Len(), dst.Len())
|
||||
}
|
||||
if dst.Params() != src.Params() {
|
||||
t.Errorf("Params: src=%+v dst=%+v", src.Params(), dst.Params())
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
hits, err := dst.Search(mkVec(i), 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
want := fmt.Sprintf("id-%02d", i)
|
||||
if len(hits) == 0 || hits[0].ID != want {
|
||||
t.Errorf("Search after decode: id-%d → %v, want %s", i, hits, want)
|
||||
continue
|
||||
}
|
||||
wantMeta := fmt.Sprintf(`{"row":%d}`, i)
|
||||
if string(hits[0].Metadata) != wantMeta {
|
||||
t.Errorf("metadata after decode: got %q, want %q", hits[0].Metadata, wantMeta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeIndex_VersionMismatch(t *testing.T) {
|
||||
bad := bytes.NewBufferString(`{"version":999,"params":{"name":"x","dimension":4}}`)
|
||||
_, err := DecodeIndex(bad, bytes.NewReader(nil))
|
||||
if !errors.Is(err, ErrVersionMismatch) {
|
||||
t.Errorf("expected ErrVersionMismatch, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistry_CreateGetDelete(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
idx, err := r.Create(IndexParams{Name: "workers", Dimension: 4})
|
||||
|
||||
184
internal/vectord/persistor.go
Normal file
184
internal/vectord/persistor.go
Normal file
@ -0,0 +1,184 @@
|
||||
// persistor.go — drives Index ↔ storaged round-trips for vectord.
|
||||
// Single-file framed format per index (post-G1P-scrum C1 fix —
|
||||
// the prior two-file format had a torn-write class where a
|
||||
// successful envelope PUT followed by a failed graph PUT would
|
||||
// pass the "both present" List filter and load mismatched state):
|
||||
//
|
||||
// _vectors/<name>.lhv1 — single binary blob, framed:
|
||||
// [4 bytes magic "LHV1"]
|
||||
// [4 bytes envelope_len uint32 big-endian]
|
||||
// [envelope bytes — JSON envelope from Index.Encode]
|
||||
// [graph bytes — rest of file, raw hnsw.Graph.Export]
|
||||
//
|
||||
// One Put → no torn-write window. One Get → no need to filter
|
||||
// half-saved orphans on List.
|
||||
package vectord
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient"
|
||||
)
|
||||
|
||||
// VectorPrefix is the storaged key prefix that holds vector index
|
||||
// state. Objects under this prefix are NOT user vectors — those
|
||||
// live in the index in memory; this is the persistence sidecar.
|
||||
const VectorPrefix = "_vectors/"
|
||||
|
||||
// fileSuffix is the extension for the framed combined file.
|
||||
// Versioned in the magic+frame so future formats can land
|
||||
// alongside without rewriting all existing files.
|
||||
const fileSuffix = ".lhv1"
|
||||
|
||||
// magic is the 4-byte file header. Lakehouse-go Vectord v1.
|
||||
var magic = [4]byte{'L', 'H', 'V', '1'}
|
||||
|
||||
// Store is the subset of the storeclient API that persistor needs.
|
||||
// Defined as an interface so unit tests can inject a fake without
|
||||
// spinning up real storaged.
|
||||
type Store interface {
|
||||
Put(ctx context.Context, key string, body []byte) error
|
||||
Get(ctx context.Context, key string) ([]byte, error)
|
||||
Delete(ctx context.Context, key string) error
|
||||
List(ctx context.Context, prefix string) ([]string, error)
|
||||
}
|
||||
|
||||
// ErrKeyMissing is returned by Load when the persisted file is
|
||||
// absent. The single-file format means partial persistence isn't
|
||||
// a concern (post-G1P-scrum C1) — torn-write would orphan a file
|
||||
// with bad framing, surfacing as a parse error not a silent miss.
|
||||
var ErrKeyMissing = errors.New("persistor: index file missing")
|
||||
|
||||
// ErrBadFormat is returned by Load when the persisted bytes don't
|
||||
// match the expected magic or framing. Possible causes: bit-rot,
|
||||
// version skew (a future format we don't speak), partial PUT
|
||||
// before storaged grew transactional writes, or operator-edited.
|
||||
var ErrBadFormat = errors.New("persistor: bad file format")
|
||||
|
||||
// Persistor wires Index Encode/Decode to a Store.
|
||||
type Persistor struct {
|
||||
store Store
|
||||
}
|
||||
|
||||
func NewPersistor(s Store) *Persistor {
|
||||
return &Persistor{store: s}
|
||||
}
|
||||
|
||||
// Save encodes idx into a single framed blob and writes it.
|
||||
// Atomic at the storaged layer: one Put → no torn-write hazard.
|
||||
func (p *Persistor) Save(ctx context.Context, idx *Index) error {
|
||||
var envBuf, graphBuf bytes.Buffer
|
||||
if err := idx.Encode(&envBuf, &graphBuf); err != nil {
|
||||
return fmt.Errorf("encode %q: %w", idx.params.Name, err)
|
||||
}
|
||||
body := frame(envBuf.Bytes(), graphBuf.Bytes())
|
||||
if err := p.store.Put(ctx, fileKey(idx.params.Name), body); err != nil {
|
||||
return fmt.Errorf("put: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load reconstructs an Index from the persisted single-file blob.
|
||||
// ErrKeyMissing if the file is absent; ErrBadFormat on framing
|
||||
// mismatch (corruption / version skew / operator edit).
|
||||
func (p *Persistor) Load(ctx context.Context, name string) (*Index, error) {
|
||||
body, err := p.store.Get(ctx, fileKey(name))
|
||||
if err != nil {
|
||||
// Per scrum O-B1 (Opus): use errors.Is on the typed
|
||||
// sentinel, not substring matching. The prior substring
|
||||
// approach silently misclassified any 5xx body that
|
||||
// happened to contain "key not found" as missing.
|
||||
if errors.Is(err, storeclient.ErrKeyNotFound) {
|
||||
return nil, fmt.Errorf("%w: %q", ErrKeyMissing, name)
|
||||
}
|
||||
return nil, fmt.Errorf("get %q: %w", name, err)
|
||||
}
|
||||
envBytes, graphBytes, err := unframe(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unframe %q: %w", name, err)
|
||||
}
|
||||
idx, err := DecodeIndex(bytes.NewReader(envBytes), bytes.NewReader(graphBytes))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode %q: %w", name, err)
|
||||
}
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// Delete removes the persisted file for an index. Idempotent on
|
||||
// missing — storaged DELETE is itself idempotent.
|
||||
func (p *Persistor) Delete(ctx context.Context, name string) error {
|
||||
if err := p.store.Delete(ctx, fileKey(name)); err != nil {
|
||||
return fmt.Errorf("delete %q: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns persisted index names, sorted. Single-file format
|
||||
// means no half-saved-orphan filtering needed — bad framing on Load
|
||||
// surfaces as ErrBadFormat per index, which the rehydrate caller
|
||||
// can log and skip.
|
||||
func (p *Persistor) List(ctx context.Context) ([]string, error) {
|
||||
keys, err := p.store.List(ctx, VectorPrefix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list: %w", err)
|
||||
}
|
||||
out := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
if !strings.HasPrefix(k, VectorPrefix) || !strings.HasSuffix(k, fileSuffix) {
|
||||
continue
|
||||
}
|
||||
name := strings.TrimSuffix(strings.TrimPrefix(k, VectorPrefix), fileSuffix)
|
||||
if name != "" {
|
||||
out = append(out, name)
|
||||
}
|
||||
}
|
||||
sort.Strings(out)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func fileKey(name string) string { return VectorPrefix + name + fileSuffix }
|
||||
|
||||
// frame produces the on-disk byte layout:
|
||||
//
|
||||
// [4 bytes magic "LHV1"]
|
||||
// [4 bytes envelope_len uint32 big-endian]
|
||||
// [envelope bytes]
|
||||
// [graph bytes — rest of file]
|
||||
func frame(envBytes, graphBytes []byte) []byte {
|
||||
out := make([]byte, 0, 8+len(envBytes)+len(graphBytes))
|
||||
out = append(out, magic[:]...)
|
||||
var lenBuf [4]byte
|
||||
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(envBytes)))
|
||||
out = append(out, lenBuf[:]...)
|
||||
out = append(out, envBytes...)
|
||||
out = append(out, graphBytes...)
|
||||
return out
|
||||
}
|
||||
|
||||
// unframe reverses frame. Returns ErrBadFormat on any mismatch.
|
||||
func unframe(body []byte) (envBytes, graphBytes []byte, err error) {
|
||||
if len(body) < 8 {
|
||||
return nil, nil, fmt.Errorf("%w: body too short (%d bytes)", ErrBadFormat, len(body))
|
||||
}
|
||||
if !bytes.Equal(body[:4], magic[:]) {
|
||||
return nil, nil, fmt.Errorf("%w: bad magic %q", ErrBadFormat, body[:4])
|
||||
}
|
||||
envLen := binary.BigEndian.Uint32(body[4:8])
|
||||
if int(envLen)+8 > len(body) {
|
||||
return nil, nil, fmt.Errorf("%w: envelope_len=%d exceeds body=%d", ErrBadFormat, envLen, len(body))
|
||||
}
|
||||
envBytes = body[8 : 8+envLen]
|
||||
graphBytes = body[8+envLen:]
|
||||
return envBytes, graphBytes, nil
|
||||
}
|
||||
|
||||
// (io import retained for future Save/Load that streams; current
|
||||
// path is buffer-based.)
|
||||
var _ = io.Discard
|
||||
157
internal/vectord/persistor_test.go
Normal file
157
internal/vectord/persistor_test.go
Normal file
@ -0,0 +1,157 @@
|
||||
package vectord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient"
|
||||
)
|
||||
|
||||
// memStore is an in-memory Store fake for unit tests. Returns the
|
||||
// real storeclient.ErrKeyNotFound sentinel so persistor.Load's
|
||||
// errors.Is path is exercised faithfully.
|
||||
type memStore struct {
|
||||
mu sync.Mutex
|
||||
data map[string][]byte
|
||||
}
|
||||
|
||||
func newMemStore() *memStore { return &memStore{data: map[string][]byte{}} }
|
||||
|
||||
func (m *memStore) Put(_ context.Context, key string, body []byte) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
cp := make([]byte, len(body))
|
||||
copy(cp, body)
|
||||
m.data[key] = cp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memStore) Get(_ context.Context, key string) ([]byte, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
b, ok := m.data[key]
|
||||
if !ok {
|
||||
return nil, storeclient.ErrKeyNotFound
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (m *memStore) Delete(_ context.Context, key string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.data, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memStore) List(_ context.Context, prefix string) ([]string, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
out := []string{}
|
||||
for k := range m.data {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
out = append(out, k)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func TestPersistor_SaveLoad_RoundTrip(t *testing.T) {
|
||||
const n = 16
|
||||
store := newMemStore()
|
||||
p := NewPersistor(store)
|
||||
|
||||
src, _ := NewIndex(IndexParams{Name: "workers", Dimension: n, Distance: DistanceCosine})
|
||||
for i := 0; i < n; i++ {
|
||||
v := make([]float32, n)
|
||||
v[i%n] = 1.0
|
||||
v[(i+1)%n] = 0.001
|
||||
_ = src.Add(fmt.Sprintf("w-%02d", i), v, json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)))
|
||||
}
|
||||
if err := p.Save(context.Background(), src); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dst, err := p.Load(context.Background(), "workers")
|
||||
if err != nil {
|
||||
t.Fatalf("Load: %v", err)
|
||||
}
|
||||
if dst.Len() != src.Len() {
|
||||
t.Errorf("Len: src=%d dst=%d", src.Len(), dst.Len())
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
v := make([]float32, n)
|
||||
v[i%n] = 1.0
|
||||
v[(i+1)%n] = 0.001
|
||||
hits, _ := dst.Search(v, 1)
|
||||
want := fmt.Sprintf("w-%02d", i)
|
||||
if len(hits) == 0 || hits[0].ID != want {
|
||||
t.Errorf("recall after Load: i=%d hits=%v", i, hits)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistor_Load_MissingReturnsErrKeyMissing(t *testing.T) {
|
||||
p := NewPersistor(newMemStore())
|
||||
_, err := p.Load(context.Background(), "nope")
|
||||
if !errors.Is(err, ErrKeyMissing) {
|
||||
t.Errorf("expected ErrKeyMissing, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistor_Delete_RemovesBothFiles(t *testing.T) {
|
||||
store := newMemStore()
|
||||
p := NewPersistor(store)
|
||||
src, _ := NewIndex(IndexParams{Name: "x", Dimension: 4})
|
||||
_ = src.Add("a", []float32{1, 0, 0, 0}, nil)
|
||||
_ = p.Save(context.Background(), src)
|
||||
|
||||
if err := p.Delete(context.Background(), "x"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := p.Load(context.Background(), "x"); !errors.Is(err, ErrKeyMissing) {
|
||||
t.Errorf("after Delete, Load should ErrKeyMissing, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistor_List_FiltersBySuffix(t *testing.T) {
|
||||
// Single-file format means no orphan-pair concept; we just
|
||||
// filter on the .lhv1 suffix. Garbage files under VectorPrefix
|
||||
// (e.g. operator drops something there) shouldn't show up.
|
||||
store := newMemStore()
|
||||
p := NewPersistor(store)
|
||||
|
||||
src, _ := NewIndex(IndexParams{Name: "alpha", Dimension: 4})
|
||||
_ = src.Add("a", []float32{1, 0, 0, 0}, nil)
|
||||
_ = p.Save(context.Background(), src)
|
||||
|
||||
src2, _ := NewIndex(IndexParams{Name: "beta", Dimension: 4})
|
||||
_ = src2.Add("b", []float32{0, 1, 0, 0}, nil)
|
||||
_ = p.Save(context.Background(), src2)
|
||||
|
||||
// A garbage file under the prefix that shouldn't match.
|
||||
_ = store.Put(context.Background(), VectorPrefix+"README.txt", []byte("not an index"))
|
||||
|
||||
got, err := p.List(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(got) != 2 || got[0] != "alpha" || got[1] != "beta" {
|
||||
t.Errorf("List: got %v, want [alpha beta]", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistor_Load_BadFormat(t *testing.T) {
|
||||
store := newMemStore()
|
||||
p := NewPersistor(store)
|
||||
// Manually plant a file with bad magic → Load surfaces ErrBadFormat.
|
||||
_ = store.Put(context.Background(), fileKey("corrupt"), []byte("not lhv1 framing"))
|
||||
_, err := p.Load(context.Background(), "corrupt")
|
||||
if !errors.Is(err, ErrBadFormat) {
|
||||
t.Errorf("expected ErrBadFormat, got %v", err)
|
||||
}
|
||||
}
|
||||
@ -47,6 +47,21 @@ func (r *Registry) Create(p IndexParams) (*Index, error) {
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// RegisterPrebuilt installs an already-built Index under its
|
||||
// params.Name. Used by the persistor's rehydrate path — the index
|
||||
// has been Decoded from disk and shouldn't be re-built fresh by
|
||||
// Create. Returns ErrIndexAlreadyExists if the name is taken.
|
||||
func (r *Registry) RegisterPrebuilt(idx *Index) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
name := idx.Params().Name
|
||||
if _, exists := r.indexes[name]; exists {
|
||||
return fmt.Errorf("%w: %q", ErrIndexAlreadyExists, name)
|
||||
}
|
||||
r.indexes[name] = idx
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get returns the index for name, or ErrIndexNotFound.
|
||||
func (r *Registry) Get(name string) (*Index, error) {
|
||||
r.mu.RLock()
|
||||
|
||||
@ -30,6 +30,8 @@ max_ingest_bytes = 268435456
|
||||
|
||||
[vectord]
|
||||
bind = "127.0.0.1:3215"
|
||||
# Optional — set to empty string to disable persistence (dev/test).
|
||||
storaged_url = "http://127.0.0.1:3211"
|
||||
|
||||
[queryd]
|
||||
bind = "127.0.0.1:3214"
|
||||
|
||||
@ -46,10 +46,10 @@ poll_health() {
|
||||
}
|
||||
|
||||
echo "[g1-smoke] launching vectord → gateway..."
|
||||
./bin/vectord > /tmp/vectord.log 2>&1 &
|
||||
./bin/vectord -config scripts/g1_smoke.toml > /tmp/vectord.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; }
|
||||
./bin/gateway > /tmp/gateway.log 2>&1 &
|
||||
./bin/gateway -config scripts/g1_smoke.toml > /tmp/gateway.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
|
||||
|
||||
@ -81,14 +81,15 @@ echo "[g1-smoke] add batch of 200 vectors:"
|
||||
# Build a batch payload with 200 items. ID encodes index; vector
|
||||
# is a deterministic spread so we can assert recall on a known id.
|
||||
python3 - <<EOF > "$TMP/batch.json"
|
||||
import json, math
|
||||
import json
|
||||
items = []
|
||||
for i in range(200):
|
||||
# Each vector is unit-norm with one dominant axis cycling through 8 dims.
|
||||
# Each vector is unit-norm-ish with one dominant axis cycling
|
||||
# through DIM dims, and a UNIQUE perturbation derived from i itself
|
||||
# (no modulo — would create collisions when 200/DIM rolls over).
|
||||
v = [0.0] * $DIM
|
||||
v[i % $DIM] = 1.0
|
||||
# Add a tiny perturbation per id so vectors are distinct.
|
||||
v[(i+1) % $DIM] = (i % 17) * 0.01
|
||||
v[(i+1) % $DIM] = i * 0.0001 # unique per i
|
||||
items.append({"id": f"w-{i:03d}", "vector": v, "metadata": {"row": i}})
|
||||
print(json.dumps({"items": items}))
|
||||
EOF
|
||||
@ -110,7 +111,7 @@ import json
|
||||
i = 42
|
||||
v = [0.0] * $DIM
|
||||
v[i % $DIM] = 1.0
|
||||
v[(i+1) % $DIM] = (i % 17) * 0.01
|
||||
v[(i+1) % $DIM] = i * 0.0001 # match the unique-per-i perturbation
|
||||
print(json.dumps({"vector": v, "k": 3}))
|
||||
EOF
|
||||
SEARCH_RESP="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \
|
||||
|
||||
24
scripts/g1_smoke.toml
Normal file
24
scripts/g1_smoke.toml
Normal file
@ -0,0 +1,24 @@
|
||||
# g1_smoke override — disables vectord persistence so the in-memory
|
||||
# API test stays pure (no rehydrate-from-storaged contamination).
|
||||
# g1p_smoke covers the persistence path.
|
||||
|
||||
[gateway]
|
||||
bind = "127.0.0.1:3110"
|
||||
storaged_url = "http://127.0.0.1:3211"
|
||||
catalogd_url = "http://127.0.0.1:3212"
|
||||
ingestd_url = "http://127.0.0.1:3213"
|
||||
queryd_url = "http://127.0.0.1:3214"
|
||||
vectord_url = "http://127.0.0.1:3215"
|
||||
|
||||
[storaged]
|
||||
bind = "127.0.0.1:3211"
|
||||
|
||||
[vectord]
|
||||
bind = "127.0.0.1:3215"
|
||||
storaged_url = "" # disable persistence — pure in-memory for this smoke
|
||||
|
||||
[s3]
|
||||
endpoint = "http://localhost:9000"
|
||||
region = "us-east-1"
|
||||
bucket = "lakehouse-go-primary"
|
||||
use_path_style = true
|
||||
167
scripts/g1p_smoke.sh
Executable file
167
scripts/g1p_smoke.sh
Executable file
@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env bash
|
||||
# G1P smoke — vectord persistence across restart.
|
||||
#
|
||||
# Validates:
|
||||
# - Create index + add 50 vectors → Save fires (storaged shows
|
||||
# _vectors/persist_demo.json AND _vectors/persist_demo.hnsw)
|
||||
# - Search w-001 → recall=1 with distance ≈ 0
|
||||
# - Kill vectord → restart → initial Refresh enumerates persisted
|
||||
# indexes, Loads each back into the registry
|
||||
# - Same search post-restart → still recall=1 (state survived)
|
||||
# - DELETE → both files removed from storaged → restart → index gone
|
||||
#
|
||||
# Requires storaged + vectord both up.
|
||||
#
|
||||
# Usage: ./scripts/g1p_smoke.sh
|
||||
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
export PATH="$PATH:/usr/local/go/bin"
|
||||
|
||||
echo "[g1p-smoke] building storaged + vectord + gateway..."
|
||||
go build -o bin/ ./cmd/storaged ./cmd/vectord ./cmd/gateway
|
||||
|
||||
pkill -f "bin/(storaged|vectord|gateway)" 2>/dev/null || true
|
||||
sleep 0.3
|
||||
|
||||
PIDS=()
|
||||
TMP="$(mktemp -d)"
|
||||
cleanup() {
|
||||
echo "[g1p-smoke] cleanup"
|
||||
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
|
||||
rm -rf "$TMP"
|
||||
}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
poll_health() {
|
||||
local port="$1" deadline=$(($(date +%s) + 5))
|
||||
while [ "$(date +%s)" -lt "$deadline" ]; do
|
||||
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
|
||||
sleep 0.05
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
NAME="persist_demo"
|
||||
DIM=8
|
||||
|
||||
echo "[g1p-smoke] launching storaged..."
|
||||
./bin/storaged > /tmp/storaged.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3211 || { echo "storaged failed"; tail /tmp/storaged.log; exit 1; }
|
||||
|
||||
# Clean any prior persisted state for this index name.
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_vectors/$NAME.lhv1" || true
|
||||
|
||||
launch_vectord() {
|
||||
./bin/vectord > /tmp/vectord.log 2>&1 &
|
||||
VECTORD_PID=$!
|
||||
PIDS+=($VECTORD_PID)
|
||||
poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; return 1; }
|
||||
}
|
||||
|
||||
launch_gateway() {
|
||||
./bin/gateway > /tmp/gateway.log 2>&1 &
|
||||
PIDS+=($!)
|
||||
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; return 1; }
|
||||
}
|
||||
|
||||
echo "[g1p-smoke] launching vectord (round 1) → gateway..."
|
||||
launch_vectord
|
||||
launch_gateway
|
||||
|
||||
FAILED=0
|
||||
|
||||
echo "[g1p-smoke] create index + add 50 vectors:"
|
||||
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/vectors/index \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d "{\"name\":\"$NAME\",\"dimension\":$DIM,\"distance\":\"cosine\"}")"
|
||||
if [ "$HTTP" != "201" ]; then echo " ✗ create → $HTTP"; FAILED=1; fi
|
||||
|
||||
# Build a 50-vector batch.
|
||||
python3 - <<EOF > "$TMP/batch.json"
|
||||
import json
|
||||
items = []
|
||||
for i in range(50):
|
||||
v = [0.0] * $DIM
|
||||
v[i % $DIM] = 1.0
|
||||
v[(i+1) % $DIM] = (i % 13) * 0.01
|
||||
items.append({"id": f"w-{i:03d}", "vector": v, "metadata": {"row": i}})
|
||||
print(json.dumps({"items": items}))
|
||||
EOF
|
||||
LEN="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/add" \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d @"$TMP/batch.json" | jq -r '.length')"
|
||||
if [ "$LEN" = "50" ]; then echo " ✓ added 50 → length=50"; else echo " ✗ add → length=$LEN"; FAILED=1; fi
|
||||
|
||||
echo "[g1p-smoke] verify storaged has the persistence file:"
|
||||
FILE_KEY="_vectors/$NAME.lhv1"
|
||||
FILE_HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$FILE_KEY")"
|
||||
if [ "$FILE_HTTP" = "200" ]; then
|
||||
echo " ✓ $FILE_KEY present in storaged"
|
||||
else
|
||||
echo " ✗ file=$FILE_HTTP"; FAILED=1
|
||||
fi
|
||||
|
||||
echo "[g1p-smoke] search pre-restart:"
|
||||
python3 - <<EOF > "$TMP/q.json"
|
||||
import json
|
||||
i = 1
|
||||
v = [0.0] * $DIM
|
||||
v[i % $DIM] = 1.0
|
||||
v[(i+1) % $DIM] = (i % 13) * 0.01
|
||||
print(json.dumps({"vector": v, "k": 1}))
|
||||
EOF
|
||||
TOP_PRE="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \
|
||||
-H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].id')"
|
||||
if [ "$TOP_PRE" = "w-001" ]; then echo " ✓ pre-restart top hit = w-001"; else echo " ✗ top=$TOP_PRE"; FAILED=1; fi
|
||||
|
||||
echo "[g1p-smoke] kill + restart vectord (rehydrate path):"
|
||||
kill $VECTORD_PID 2>/dev/null || true
|
||||
wait $VECTORD_PID 2>/dev/null || true
|
||||
sleep 0.3
|
||||
launch_vectord
|
||||
sleep 0.2 # let initial Refresh complete (no API to query rehydration done)
|
||||
|
||||
echo "[g1p-smoke] vectord rehydrated index list shows $NAME:"
|
||||
NAMES_HTTP="$(curl -sS http://127.0.0.1:3110/v1/vectors/index | jq -r '.names | length')"
|
||||
if [ "$NAMES_HTTP" = "1" ]; then echo " ✓ list count=1 after restart"; else echo " ✗ count=$NAMES_HTTP"; FAILED=1; fi
|
||||
|
||||
INFO_LEN="$(curl -sS "http://127.0.0.1:3110/v1/vectors/index/$NAME" | jq -r '.length')"
|
||||
if [ "$INFO_LEN" = "50" ]; then echo " ✓ length=50 after restart (state survived)"; else echo " ✗ length=$INFO_LEN"; FAILED=1; fi
|
||||
|
||||
echo "[g1p-smoke] search post-restart:"
|
||||
TOP_POST="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \
|
||||
-H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].id')"
|
||||
DIST_POST="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \
|
||||
-H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].distance')"
|
||||
if [ "$TOP_POST" = "w-001" ]; then
|
||||
echo " ✓ post-restart top hit = w-001 (dist=$DIST_POST)"
|
||||
else
|
||||
echo " ✗ post-restart top=$TOP_POST"; FAILED=1
|
||||
fi
|
||||
|
||||
echo "[g1p-smoke] DELETE then restart → index gone:"
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3110/v1/vectors/index/$NAME"
|
||||
FILE_HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$FILE_KEY")"
|
||||
if [ "$FILE_HTTP" = "404" ]; then
|
||||
echo " ✓ persistence file removed from storaged"
|
||||
else
|
||||
echo " ✗ file=$FILE_HTTP after delete"; FAILED=1
|
||||
fi
|
||||
kill $VECTORD_PID 2>/dev/null || true
|
||||
wait $VECTORD_PID 2>/dev/null || true
|
||||
sleep 0.3
|
||||
launch_vectord
|
||||
sleep 0.2
|
||||
NAMES_HTTP="$(curl -sS http://127.0.0.1:3110/v1/vectors/index | jq -r '.names | length')"
|
||||
if [ "$NAMES_HTTP" = "0" ]; then echo " ✓ post-delete restart list count=0"; else echo " ✗ count=$NAMES_HTTP"; FAILED=1; fi
|
||||
|
||||
if [ "$FAILED" -eq 0 ]; then
|
||||
echo "[g1p-smoke] G1P acceptance gate: PASSED"
|
||||
exit 0
|
||||
else
|
||||
echo "[g1p-smoke] G1P acceptance gate: FAILED"
|
||||
exit 1
|
||||
fi
|
||||
Loading…
x
Reference in New Issue
Block a user