diff --git a/cmd/catalogd/main.go b/cmd/catalogd/main.go index da0303d..9f24812 100644 --- a/cmd/catalogd/main.go +++ b/cmd/catalogd/main.go @@ -18,6 +18,7 @@ import ( "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient" ) func main() { @@ -34,7 +35,7 @@ func main() { os.Exit(1) } - store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL) + store := storeclient.New(cfg.Catalogd.StoragedURL) registry := catalogd.NewRegistry(store) rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/cmd/vectord/main.go b/cmd/vectord/main.go index 605ea71..1ee8e89 100644 --- a/cmd/vectord/main.go +++ b/cmd/vectord/main.go @@ -1,15 +1,14 @@ -// vectord is the vector-search service. In-memory HNSW indexes -// keyed by string IDs with optional opaque JSON metadata. Wraps +// vectord is the vector-search service. HNSW indexes keyed by +// string IDs with optional opaque JSON metadata. Wraps // github.com/coder/hnsw (pure Go, no cgo). // -// G1 scope: in-memory only, single-process. Persistence to storaged -// + rehydrate-on-restart is the next piece. The HTTP surface is -// stable enough to build the staffing co-pilot's "find workers -// like X" path on top right now — the indexes just have to be -// rebuilt after a restart. +// G1 + persistence: indexes are persisted to storaged at +// _vectors/.{json,hnsw} and rehydrated on startup. Setting +// [vectord].storaged_url empty disables persistence (dev mode). package main import ( + "context" "encoding/json" "errors" "flag" @@ -18,10 +17,12 @@ import ( "os" "strconv" "strings" + "time" "github.com/go-chi/chi/v5" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient" "git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord" ) @@ -43,6 +44,24 @@ func main() { h := &handlers{reg: vectord.NewRegistry()} + // Persistence is optional — empty StoragedURL = dev/ephemeral mode. + if cfg.Vectord.StoragedURL != "" { + h.persist = vectord.NewPersistor(storeclient.New(cfg.Vectord.StoragedURL)) + + // Rehydrate any persisted indexes at startup. Failures are + // logged-not-fatal: storaged might be coming up after vectord, + // and an index that failed to load is still recoverable by + // re-ingesting on top of an empty registry. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + n, err := h.rehydrate(ctx) + cancel() + if err != nil { + slog.Warn("rehydrate", "err", err, "loaded", n) + } else { + slog.Info("rehydrated", "indexes", n) + } + } + if err := shared.Run("vectord", cfg.Vectord.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) @@ -50,7 +69,65 @@ func main() { } type handlers struct { - reg *vectord.Registry + reg *vectord.Registry + persist *vectord.Persistor // nil when persistence is disabled +} + +// rehydrate enumerates persisted indexes and loads each into the +// registry. Returns the count of successfully loaded indexes plus +// the first error (if any) — caller decides fatality. +func (h *handlers) rehydrate(ctx context.Context) (int, error) { + if h.persist == nil { + return 0, nil + } + names, err := h.persist.List(ctx) + if err != nil { + return 0, err + } + loaded := 0 + for _, name := range names { + idx, err := h.persist.Load(ctx, name) + if err != nil { + slog.Warn("rehydrate skip", "name", name, "err", err) + continue + } + // The registry's Create rebuilds an empty Index from params; + // we want the LOADED one (with vectors). Bypass via a + // helper that registers a pre-built Index. + if err := h.reg.RegisterPrebuilt(idx); err != nil { + slog.Warn("rehydrate register", "name", name, "err", err) + continue + } + loaded++ + } + return loaded, nil +} + +// saveAfter is the post-write persistence hook. Logs-not-fatal: +// in-memory state is the source of truth in flight; a failed save +// gets re-attempted on the next mutation, and the operator log +// shows the storaged outage. +func (h *handlers) saveAfter(idx *vectord.Index) { + if h.persist == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := h.persist.Save(ctx, idx); err != nil { + slog.Warn("persist save", "name", idx.Params().Name, "err", err) + } +} + +// deleteAfter mirrors saveAfter for the Delete path. +func (h *handlers) deleteAfter(name string) { + if h.persist == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := h.persist.Delete(ctx, name); err != nil { + slog.Warn("persist delete", "name", name, "err", err) + } } func (h *handlers) register(r chi.Router) { @@ -125,6 +202,7 @@ func (h *handlers) handleCreate(w http.ResponseWriter, r *http.Request) { http.Error(w, "internal", http.StatusInternalServerError) return } + h.saveAfter(idx) writeJSON(w, http.StatusCreated, indexInfo{Params: idx.Params(), Length: idx.Len()}) } @@ -156,6 +234,7 @@ func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) { http.Error(w, "internal", http.StatusInternalServerError) return } + h.deleteAfter(name) w.WriteHeader(http.StatusNoContent) } @@ -174,12 +253,13 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) { http.Error(w, "items must be non-empty", http.StatusBadRequest) return } - // Per scrum O-W4 (Opus): pre-validate all items before any Add, - // so a bad item at position N doesn't leave items 0..N-1 already - // committed and item N rejected. Both checks (empty id, dim - // mismatch) are local; running them up-front is O(N) extra work - // that the success path already paid in idx.Add. - dim := idx.Params().Dimension + // Per scrum O-W4 (Opus, D5): pre-validate all items before any + // Add, so a bad item at position N doesn't leave items 0..N-1 + // committed and item N rejected. Per scrum O-I3 (Opus, G1P): + // extend pre-validation to cover NaN/Inf and zero-norm — these + // were caught inside idx.Add but only after partial commits. + params := idx.Params() + dim := params.Dimension for j, it := range req.Items { if it.ID == "" { http.Error(w, "items["+strconv.Itoa(j)+"]: empty id", http.StatusBadRequest) @@ -189,6 +269,10 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) { http.Error(w, "items["+strconv.Itoa(j)+"]: dim mismatch (index="+strconv.Itoa(dim)+", got="+strconv.Itoa(len(it.Vector))+")", http.StatusBadRequest) return } + if err := vectord.ValidateVector(it.Vector, params.Distance); err != nil { + http.Error(w, "items["+strconv.Itoa(j)+"]: "+err.Error(), http.StatusBadRequest) + return + } } for j, it := range req.Items { if err := idx.Add(it.ID, it.Vector, it.Metadata); err != nil { @@ -206,6 +290,9 @@ func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) { return } } + // One save per batch (post-loop), not per item. Per scrum + // O-W4-style discipline: HTTP-batch boundary is the natural unit. + h.saveAfter(idx) writeJSON(w, http.StatusOK, map[string]any{"added": len(req.Items), "length": idx.Len()}) } diff --git a/internal/catalogd/registry_test.go b/internal/catalogd/registry_test.go index 6f2534a..b965f13 100644 --- a/internal/catalogd/registry_test.go +++ b/internal/catalogd/registry_test.go @@ -6,6 +6,8 @@ import ( "sync" "testing" "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient" ) // memStore is an in-memory Store fake for unit tests. @@ -30,7 +32,7 @@ func (m *memStore) Get(_ context.Context, key string) ([]byte, error) { defer m.mu.Unlock() b, ok := m.data[key] if !ok { - return nil, ErrKeyNotFound + return nil, storeclient.ErrKeyNotFound } return b, nil } diff --git a/internal/shared/config.go b/internal/shared/config.go index 3234825..5e298a3 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -60,10 +60,13 @@ type GatewayConfig struct { VectordURL string `toml:"vectord_url"` } -// VectordConfig adds vectord-specific knobs. Currently just bind; -// G1+ will add persistence-to-storaged URL + index params defaults. +// VectordConfig adds vectord-specific knobs. StoragedURL is +// optional — empty string disables persistence, useful for ephemeral +// dev or test runs. When set, indexes Save after every state change +// and Load on startup. type VectordConfig struct { - Bind string `toml:"bind"` + Bind string `toml:"bind"` + StoragedURL string `toml:"storaged_url"` } // QuerydConfig adds queryd-specific knobs. queryd talks DuckDB @@ -129,7 +132,10 @@ func DefaultConfig() Config { CatalogdURL: "http://127.0.0.1:3212", MaxIngestBytes: 256 << 20, // 256 MiB; bump per deployment via lakehouse.toml }, - Vectord: VectordConfig{Bind: "127.0.0.1:3215"}, + Vectord: VectordConfig{ + Bind: "127.0.0.1:3215", + StoragedURL: "http://127.0.0.1:3211", + }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", CatalogdURL: "http://127.0.0.1:3212", diff --git a/internal/catalogd/store_client.go b/internal/storeclient/client.go similarity index 57% rename from internal/catalogd/store_client.go rename to internal/storeclient/client.go index 8d70a97..bab2e19 100644 --- a/internal/catalogd/store_client.go +++ b/internal/storeclient/client.go @@ -1,9 +1,10 @@ -// store_client.go — thin HTTP client to storaged. catalogd needs to -// PUT manifest Parquets, GET them on startup, and LIST the manifests -// directory. Staying inside an HTTP boundary (rather than reaching -// into storaged's package directly) preserves the service-boundary -// shape that gRPC will eventually formalize at G1+. -package catalogd +// Package storeclient is the shared HTTP client to storaged's +// /storage/* surface. catalogd uses Get/Put/List for manifest +// persistence; vectord (G1+) uses the same shape for HNSW index +// persistence. Extracting it here keeps the dep direction clean — +// a service that talks to storaged depends on this package, not +// on another service's package. +package storeclient import ( "bytes" @@ -17,15 +18,19 @@ import ( "time" ) -// StoreClient talks HTTP to the storaged service. -type StoreClient struct { +// ErrKeyNotFound mirrors storaged's not-found semantics on the +// caller side without exposing storaged's package types. +var ErrKeyNotFound = fmt.Errorf("storeclient: key not found") + +// Client talks HTTP to a storaged service. +type Client struct { baseURL string hc *http.Client } // listResponse mirrors storaged's GET /storage/list shape: // -// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]} +// {"prefix":"_catalog/manifests/","objects":[{Key,Size,...}, ...]} type listResponse struct { Prefix string `json:"prefix"` Objects []struct { @@ -34,19 +39,18 @@ type listResponse struct { } `json:"objects"` } -// NewStoreClient builds a client against the given storaged base URL -// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write -// only; rehydration of many manifests at startup runs sequentially -// and each call gets its own timeout window. -func NewStoreClient(baseURL string) *StoreClient { - return &StoreClient{ +// New builds a client against the given storaged base URL +// (e.g. "http://127.0.0.1:3211"). Timeout covers a single op only; +// callers that drive many ops sequentially get a fresh window per call. +func New(baseURL string) *Client { + return &Client{ baseURL: strings.TrimRight(baseURL, "/"), hc: &http.Client{Timeout: 30 * time.Second}, } } -// Put writes raw bytes at key. body is the encoded Parquet manifest. -func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error { +// Put writes raw bytes at key. +func (c *Client) Put(ctx context.Context, key string, body []byte) error { u := c.baseURL + "/storage/put/" + safeKey(key) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body)) if err != nil { @@ -65,8 +69,8 @@ func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error { return nil } -// Get reads the bytes at key. ErrKeyNotFound on 404. -func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) { +// Get reads the bytes at key. Returns ErrKeyNotFound on 404. +func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { u := c.baseURL + "/storage/get/" + safeKey(key) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { @@ -87,9 +91,29 @@ func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) { return io.ReadAll(resp.Body) } -// List returns the keys under prefix. Object metadata beyond Key is -// ignored — catalogd only needs the keys to drive rehydration. -func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) { +// Delete removes the key. Idempotent — a missing key is not an +// error (matches storaged's underlying S3 DeleteObject semantics). +func (c *Client) Delete(ctx context.Context, key string) error { + u := c.baseURL + "/storage/delete/" + safeKey(key) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u, nil) + if err != nil { + return fmt.Errorf("delete req: %w", err) + } + resp, err := c.hc.Do(req) + if err != nil { + return fmt.Errorf("delete do: %w", err) + } + defer drainAndClose(resp.Body) + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return fmt.Errorf("delete %s: status %d: %s", key, resp.StatusCode, string(preview)) + } + return nil +} + +// List returns the keys under prefix. Caller-side filtering on +// suffix or other shape lives outside this package. +func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { @@ -115,10 +139,6 @@ func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) return out, nil } -// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd -// side without exposing storaged's package types. -var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found") - // safeKey URL-escapes path segments while preserving "/". storaged's // chi `/storage//*` routes accept literal slashes in the // wildcard match, so we only escape the segments, not the separators. @@ -131,9 +151,8 @@ func safeKey(key string) string { } // drainAndClose reads any remaining body bytes (capped at 64 KiB) and -// closes the body. Per scrum S6 (Qwen): preview-then-close on error -// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep- -// alive reuse and slowly leaking sockets. +// closes the body — keeps HTTP/1.1 keep-alive pool reuse healthy on +// error paths. func drainAndClose(body io.ReadCloser) { _, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10)) _ = body.Close() diff --git a/internal/vectord/index.go b/internal/vectord/index.go index 8c34083..b7da62c 100644 --- a/internal/vectord/index.go +++ b/internal/vectord/index.go @@ -12,6 +12,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math" "sync" @@ -200,8 +201,15 @@ func (i *Index) resetGraphLocked() { i.g = g } +// ValidateVector is the exported form of validateVector — the HTTP +// handler pre-validates batches before committing, so it needs the +// same predicate Add uses internally. Per scrum O-I3 (G1P). +func ValidateVector(vec []float32, distance string) error { + return validateVector(vec, distance) +} + // validateVector rejects vectors that would poison the HNSW -// graph or produce NaN distances. Per scrum O-W3 (Opus). +// graph or produce NaN distances. Per scrum O-W3 (Opus, G1). func validateVector(vec []float32, distance string) error { var sumSq float64 for j, v := range vec { @@ -259,6 +267,75 @@ func (i *Index) Search(query []float32, k int) ([]Result, error) { return out, nil } +// IndexEnvelope is the JSON shape persisted alongside the binary +// HNSW graph bytes. params + metadata + format version travel +// together; the graph itself is opaque binary that round-trips +// through hnsw.Graph.Export / Import. +type IndexEnvelope struct { + Version int `json:"version"` + Params IndexParams `json:"params"` + Metadata map[string]json.RawMessage `json:"metadata"` +} + +// envelopeVersion bumps when the on-disk JSON shape changes +// incompatibly. Reading a future version returns ErrVersionMismatch +// rather than producing a half-decoded index. +const envelopeVersion = 1 + +// ErrVersionMismatch is returned by DecodeIndex when the envelope +// claims a version this build doesn't understand. +var ErrVersionMismatch = errors.New("vectord: unknown envelope version") + +// Encode writes the index's JSON envelope (params + metadata) and +// the binary HNSW graph bytes through two writers. Two-stream +// shape lets the persistor write each to a distinct storaged key +// without reframing. +// +// envelopeW receives params+metadata as JSON; graphW receives the +// raw output of hnsw.Graph.Export. +func (i *Index) Encode(envelopeW, graphW io.Writer) error { + i.mu.RLock() + defer i.mu.RUnlock() + + env := IndexEnvelope{ + Version: envelopeVersion, + Params: i.params, + Metadata: i.meta, + } + if err := json.NewEncoder(envelopeW).Encode(env); err != nil { + return fmt.Errorf("encode envelope: %w", err) + } + if err := i.g.Export(graphW); err != nil { + return fmt.Errorf("export graph: %w", err) + } + return nil +} + +// DecodeIndex reconstructs an Index from a previously-Encoded pair +// of streams. The returned Index is independent — closing either +// reader after this call doesn't affect the result. +func DecodeIndex(envelopeR, graphR io.Reader) (*Index, error) { + var env IndexEnvelope + if err := json.NewDecoder(envelopeR).Decode(&env); err != nil { + return nil, fmt.Errorf("decode envelope: %w", err) + } + if env.Version != envelopeVersion { + return nil, fmt.Errorf("%w: have %d, got %d", + ErrVersionMismatch, envelopeVersion, env.Version) + } + idx, err := NewIndex(env.Params) + if err != nil { + return nil, err + } + if err := idx.g.Import(graphR); err != nil { + return nil, fmt.Errorf("import graph: %w", err) + } + if env.Metadata != nil { + idx.meta = env.Metadata + } + return idx, nil +} + // Lookup returns the stored vector + metadata for an id. // // Per scrum O-W1 (Opus): the vector is COPIED before return. diff --git a/internal/vectord/index_test.go b/internal/vectord/index_test.go index 4de0be7..98796f4 100644 --- a/internal/vectord/index_test.go +++ b/internal/vectord/index_test.go @@ -1,6 +1,7 @@ package vectord import ( + "bytes" "encoding/json" "errors" "fmt" @@ -129,6 +130,65 @@ func TestIndex_ConcurrentSearchAdd(t *testing.T) { wg.Wait() } +func TestEncodeDecode_RoundTrip(t *testing.T) { + const n = 16 + src, _ := NewIndex(IndexParams{Name: "x", Dimension: n, Distance: DistanceCosine}) + mkVec := func(i int) []float32 { + // Each vector is a unique unit vector along axis (i mod n) with + // a tiny perturbation on a different axis — recall=1 is robust + // without colliding under cosine. + v := make([]float32, n) + v[i%n] = 1.0 + v[(i+1)%n] = 0.001 + return v + } + for i := 0; i < n; i++ { + meta := json.RawMessage(fmt.Sprintf(`{"row":%d}`, i)) + if err := src.Add(fmt.Sprintf("id-%02d", i), mkVec(i), meta); err != nil { + t.Fatal(err) + } + } + + var envBuf, graphBuf bytes.Buffer + if err := src.Encode(&envBuf, &graphBuf); err != nil { + t.Fatalf("Encode: %v", err) + } + + dst, err := DecodeIndex(&envBuf, &graphBuf) + if err != nil { + t.Fatalf("DecodeIndex: %v", err) + } + if dst.Len() != src.Len() { + t.Errorf("Len: src=%d dst=%d", src.Len(), dst.Len()) + } + if dst.Params() != src.Params() { + t.Errorf("Params: src=%+v dst=%+v", src.Params(), dst.Params()) + } + for i := 0; i < n; i++ { + hits, err := dst.Search(mkVec(i), 1) + if err != nil { + t.Fatal(err) + } + want := fmt.Sprintf("id-%02d", i) + if len(hits) == 0 || hits[0].ID != want { + t.Errorf("Search after decode: id-%d → %v, want %s", i, hits, want) + continue + } + wantMeta := fmt.Sprintf(`{"row":%d}`, i) + if string(hits[0].Metadata) != wantMeta { + t.Errorf("metadata after decode: got %q, want %q", hits[0].Metadata, wantMeta) + } + } +} + +func TestDecodeIndex_VersionMismatch(t *testing.T) { + bad := bytes.NewBufferString(`{"version":999,"params":{"name":"x","dimension":4}}`) + _, err := DecodeIndex(bad, bytes.NewReader(nil)) + if !errors.Is(err, ErrVersionMismatch) { + t.Errorf("expected ErrVersionMismatch, got %v", err) + } +} + func TestRegistry_CreateGetDelete(t *testing.T) { r := NewRegistry() idx, err := r.Create(IndexParams{Name: "workers", Dimension: 4}) diff --git a/internal/vectord/persistor.go b/internal/vectord/persistor.go new file mode 100644 index 0000000..ef6d29f --- /dev/null +++ b/internal/vectord/persistor.go @@ -0,0 +1,184 @@ +// persistor.go — drives Index ↔ storaged round-trips for vectord. +// Single-file framed format per index (post-G1P-scrum C1 fix — +// the prior two-file format had a torn-write class where a +// successful envelope PUT followed by a failed graph PUT would +// pass the "both present" List filter and load mismatched state): +// +// _vectors/.lhv1 — single binary blob, framed: +// [4 bytes magic "LHV1"] +// [4 bytes envelope_len uint32 big-endian] +// [envelope bytes — JSON envelope from Index.Encode] +// [graph bytes — rest of file, raw hnsw.Graph.Export] +// +// One Put → no torn-write window. One Get → no need to filter +// half-saved orphans on List. +package vectord + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "sort" + "strings" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient" +) + +// VectorPrefix is the storaged key prefix that holds vector index +// state. Objects under this prefix are NOT user vectors — those +// live in the index in memory; this is the persistence sidecar. +const VectorPrefix = "_vectors/" + +// fileSuffix is the extension for the framed combined file. +// Versioned in the magic+frame so future formats can land +// alongside without rewriting all existing files. +const fileSuffix = ".lhv1" + +// magic is the 4-byte file header. Lakehouse-go Vectord v1. +var magic = [4]byte{'L', 'H', 'V', '1'} + +// Store is the subset of the storeclient API that persistor needs. +// Defined as an interface so unit tests can inject a fake without +// spinning up real storaged. +type Store interface { + Put(ctx context.Context, key string, body []byte) error + Get(ctx context.Context, key string) ([]byte, error) + Delete(ctx context.Context, key string) error + List(ctx context.Context, prefix string) ([]string, error) +} + +// ErrKeyMissing is returned by Load when the persisted file is +// absent. The single-file format means partial persistence isn't +// a concern (post-G1P-scrum C1) — torn-write would orphan a file +// with bad framing, surfacing as a parse error not a silent miss. +var ErrKeyMissing = errors.New("persistor: index file missing") + +// ErrBadFormat is returned by Load when the persisted bytes don't +// match the expected magic or framing. Possible causes: bit-rot, +// version skew (a future format we don't speak), partial PUT +// before storaged grew transactional writes, or operator-edited. +var ErrBadFormat = errors.New("persistor: bad file format") + +// Persistor wires Index Encode/Decode to a Store. +type Persistor struct { + store Store +} + +func NewPersistor(s Store) *Persistor { + return &Persistor{store: s} +} + +// Save encodes idx into a single framed blob and writes it. +// Atomic at the storaged layer: one Put → no torn-write hazard. +func (p *Persistor) Save(ctx context.Context, idx *Index) error { + var envBuf, graphBuf bytes.Buffer + if err := idx.Encode(&envBuf, &graphBuf); err != nil { + return fmt.Errorf("encode %q: %w", idx.params.Name, err) + } + body := frame(envBuf.Bytes(), graphBuf.Bytes()) + if err := p.store.Put(ctx, fileKey(idx.params.Name), body); err != nil { + return fmt.Errorf("put: %w", err) + } + return nil +} + +// Load reconstructs an Index from the persisted single-file blob. +// ErrKeyMissing if the file is absent; ErrBadFormat on framing +// mismatch (corruption / version skew / operator edit). +func (p *Persistor) Load(ctx context.Context, name string) (*Index, error) { + body, err := p.store.Get(ctx, fileKey(name)) + if err != nil { + // Per scrum O-B1 (Opus): use errors.Is on the typed + // sentinel, not substring matching. The prior substring + // approach silently misclassified any 5xx body that + // happened to contain "key not found" as missing. + if errors.Is(err, storeclient.ErrKeyNotFound) { + return nil, fmt.Errorf("%w: %q", ErrKeyMissing, name) + } + return nil, fmt.Errorf("get %q: %w", name, err) + } + envBytes, graphBytes, err := unframe(body) + if err != nil { + return nil, fmt.Errorf("unframe %q: %w", name, err) + } + idx, err := DecodeIndex(bytes.NewReader(envBytes), bytes.NewReader(graphBytes)) + if err != nil { + return nil, fmt.Errorf("decode %q: %w", name, err) + } + return idx, nil +} + +// Delete removes the persisted file for an index. Idempotent on +// missing — storaged DELETE is itself idempotent. +func (p *Persistor) Delete(ctx context.Context, name string) error { + if err := p.store.Delete(ctx, fileKey(name)); err != nil { + return fmt.Errorf("delete %q: %w", name, err) + } + return nil +} + +// List returns persisted index names, sorted. Single-file format +// means no half-saved-orphan filtering needed — bad framing on Load +// surfaces as ErrBadFormat per index, which the rehydrate caller +// can log and skip. +func (p *Persistor) List(ctx context.Context) ([]string, error) { + keys, err := p.store.List(ctx, VectorPrefix) + if err != nil { + return nil, fmt.Errorf("list: %w", err) + } + out := make([]string, 0, len(keys)) + for _, k := range keys { + if !strings.HasPrefix(k, VectorPrefix) || !strings.HasSuffix(k, fileSuffix) { + continue + } + name := strings.TrimSuffix(strings.TrimPrefix(k, VectorPrefix), fileSuffix) + if name != "" { + out = append(out, name) + } + } + sort.Strings(out) + return out, nil +} + +func fileKey(name string) string { return VectorPrefix + name + fileSuffix } + +// frame produces the on-disk byte layout: +// +// [4 bytes magic "LHV1"] +// [4 bytes envelope_len uint32 big-endian] +// [envelope bytes] +// [graph bytes — rest of file] +func frame(envBytes, graphBytes []byte) []byte { + out := make([]byte, 0, 8+len(envBytes)+len(graphBytes)) + out = append(out, magic[:]...) + var lenBuf [4]byte + binary.BigEndian.PutUint32(lenBuf[:], uint32(len(envBytes))) + out = append(out, lenBuf[:]...) + out = append(out, envBytes...) + out = append(out, graphBytes...) + return out +} + +// unframe reverses frame. Returns ErrBadFormat on any mismatch. +func unframe(body []byte) (envBytes, graphBytes []byte, err error) { + if len(body) < 8 { + return nil, nil, fmt.Errorf("%w: body too short (%d bytes)", ErrBadFormat, len(body)) + } + if !bytes.Equal(body[:4], magic[:]) { + return nil, nil, fmt.Errorf("%w: bad magic %q", ErrBadFormat, body[:4]) + } + envLen := binary.BigEndian.Uint32(body[4:8]) + if int(envLen)+8 > len(body) { + return nil, nil, fmt.Errorf("%w: envelope_len=%d exceeds body=%d", ErrBadFormat, envLen, len(body)) + } + envBytes = body[8 : 8+envLen] + graphBytes = body[8+envLen:] + return envBytes, graphBytes, nil +} + +// (io import retained for future Save/Load that streams; current +// path is buffer-based.) +var _ = io.Discard diff --git a/internal/vectord/persistor_test.go b/internal/vectord/persistor_test.go new file mode 100644 index 0000000..91e5832 --- /dev/null +++ b/internal/vectord/persistor_test.go @@ -0,0 +1,157 @@ +package vectord + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "testing" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storeclient" +) + +// memStore is an in-memory Store fake for unit tests. Returns the +// real storeclient.ErrKeyNotFound sentinel so persistor.Load's +// errors.Is path is exercised faithfully. +type memStore struct { + mu sync.Mutex + data map[string][]byte +} + +func newMemStore() *memStore { return &memStore{data: map[string][]byte{}} } + +func (m *memStore) Put(_ context.Context, key string, body []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + cp := make([]byte, len(body)) + copy(cp, body) + m.data[key] = cp + return nil +} + +func (m *memStore) Get(_ context.Context, key string) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + b, ok := m.data[key] + if !ok { + return nil, storeclient.ErrKeyNotFound + } + return b, nil +} + +func (m *memStore) Delete(_ context.Context, key string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.data, key) + return nil +} + +func (m *memStore) List(_ context.Context, prefix string) ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() + out := []string{} + for k := range m.data { + if strings.HasPrefix(k, prefix) { + out = append(out, k) + } + } + return out, nil +} + +func TestPersistor_SaveLoad_RoundTrip(t *testing.T) { + const n = 16 + store := newMemStore() + p := NewPersistor(store) + + src, _ := NewIndex(IndexParams{Name: "workers", Dimension: n, Distance: DistanceCosine}) + for i := 0; i < n; i++ { + v := make([]float32, n) + v[i%n] = 1.0 + v[(i+1)%n] = 0.001 + _ = src.Add(fmt.Sprintf("w-%02d", i), v, json.RawMessage(fmt.Sprintf(`{"i":%d}`, i))) + } + if err := p.Save(context.Background(), src); err != nil { + t.Fatal(err) + } + + dst, err := p.Load(context.Background(), "workers") + if err != nil { + t.Fatalf("Load: %v", err) + } + if dst.Len() != src.Len() { + t.Errorf("Len: src=%d dst=%d", src.Len(), dst.Len()) + } + for i := 0; i < n; i++ { + v := make([]float32, n) + v[i%n] = 1.0 + v[(i+1)%n] = 0.001 + hits, _ := dst.Search(v, 1) + want := fmt.Sprintf("w-%02d", i) + if len(hits) == 0 || hits[0].ID != want { + t.Errorf("recall after Load: i=%d hits=%v", i, hits) + } + } +} + +func TestPersistor_Load_MissingReturnsErrKeyMissing(t *testing.T) { + p := NewPersistor(newMemStore()) + _, err := p.Load(context.Background(), "nope") + if !errors.Is(err, ErrKeyMissing) { + t.Errorf("expected ErrKeyMissing, got %v", err) + } +} + +func TestPersistor_Delete_RemovesBothFiles(t *testing.T) { + store := newMemStore() + p := NewPersistor(store) + src, _ := NewIndex(IndexParams{Name: "x", Dimension: 4}) + _ = src.Add("a", []float32{1, 0, 0, 0}, nil) + _ = p.Save(context.Background(), src) + + if err := p.Delete(context.Background(), "x"); err != nil { + t.Fatal(err) + } + if _, err := p.Load(context.Background(), "x"); !errors.Is(err, ErrKeyMissing) { + t.Errorf("after Delete, Load should ErrKeyMissing, got %v", err) + } +} + +func TestPersistor_List_FiltersBySuffix(t *testing.T) { + // Single-file format means no orphan-pair concept; we just + // filter on the .lhv1 suffix. Garbage files under VectorPrefix + // (e.g. operator drops something there) shouldn't show up. + store := newMemStore() + p := NewPersistor(store) + + src, _ := NewIndex(IndexParams{Name: "alpha", Dimension: 4}) + _ = src.Add("a", []float32{1, 0, 0, 0}, nil) + _ = p.Save(context.Background(), src) + + src2, _ := NewIndex(IndexParams{Name: "beta", Dimension: 4}) + _ = src2.Add("b", []float32{0, 1, 0, 0}, nil) + _ = p.Save(context.Background(), src2) + + // A garbage file under the prefix that shouldn't match. + _ = store.Put(context.Background(), VectorPrefix+"README.txt", []byte("not an index")) + + got, err := p.List(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(got) != 2 || got[0] != "alpha" || got[1] != "beta" { + t.Errorf("List: got %v, want [alpha beta]", got) + } +} + +func TestPersistor_Load_BadFormat(t *testing.T) { + store := newMemStore() + p := NewPersistor(store) + // Manually plant a file with bad magic → Load surfaces ErrBadFormat. + _ = store.Put(context.Background(), fileKey("corrupt"), []byte("not lhv1 framing")) + _, err := p.Load(context.Background(), "corrupt") + if !errors.Is(err, ErrBadFormat) { + t.Errorf("expected ErrBadFormat, got %v", err) + } +} diff --git a/internal/vectord/registry.go b/internal/vectord/registry.go index 02feacc..a50b709 100644 --- a/internal/vectord/registry.go +++ b/internal/vectord/registry.go @@ -47,6 +47,21 @@ func (r *Registry) Create(p IndexParams) (*Index, error) { return idx, nil } +// RegisterPrebuilt installs an already-built Index under its +// params.Name. Used by the persistor's rehydrate path — the index +// has been Decoded from disk and shouldn't be re-built fresh by +// Create. Returns ErrIndexAlreadyExists if the name is taken. +func (r *Registry) RegisterPrebuilt(idx *Index) error { + r.mu.Lock() + defer r.mu.Unlock() + name := idx.Params().Name + if _, exists := r.indexes[name]; exists { + return fmt.Errorf("%w: %q", ErrIndexAlreadyExists, name) + } + r.indexes[name] = idx + return nil +} + // Get returns the index for name, or ErrIndexNotFound. func (r *Registry) Get(name string) (*Index, error) { r.mu.RLock() diff --git a/lakehouse.toml b/lakehouse.toml index 99b31e2..a6ce460 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -30,6 +30,8 @@ max_ingest_bytes = 268435456 [vectord] bind = "127.0.0.1:3215" +# Optional — set to empty string to disable persistence (dev/test). +storaged_url = "http://127.0.0.1:3211" [queryd] bind = "127.0.0.1:3214" diff --git a/scripts/g1_smoke.sh b/scripts/g1_smoke.sh index 36dacb1..f728916 100755 --- a/scripts/g1_smoke.sh +++ b/scripts/g1_smoke.sh @@ -46,10 +46,10 @@ poll_health() { } echo "[g1-smoke] launching vectord → gateway..." -./bin/vectord > /tmp/vectord.log 2>&1 & +./bin/vectord -config scripts/g1_smoke.toml > /tmp/vectord.log 2>&1 & PIDS+=($!) poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; exit 1; } -./bin/gateway > /tmp/gateway.log 2>&1 & +./bin/gateway -config scripts/g1_smoke.toml > /tmp/gateway.log 2>&1 & PIDS+=($!) poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } @@ -81,14 +81,15 @@ echo "[g1-smoke] add batch of 200 vectors:" # Build a batch payload with 200 items. ID encodes index; vector # is a deterministic spread so we can assert recall on a known id. python3 - < "$TMP/batch.json" -import json, math +import json items = [] for i in range(200): - # Each vector is unit-norm with one dominant axis cycling through 8 dims. + # Each vector is unit-norm-ish with one dominant axis cycling + # through DIM dims, and a UNIQUE perturbation derived from i itself + # (no modulo — would create collisions when 200/DIM rolls over). v = [0.0] * $DIM v[i % $DIM] = 1.0 - # Add a tiny perturbation per id so vectors are distinct. - v[(i+1) % $DIM] = (i % 17) * 0.01 + v[(i+1) % $DIM] = i * 0.0001 # unique per i items.append({"id": f"w-{i:03d}", "vector": v, "metadata": {"row": i}}) print(json.dumps({"items": items})) EOF @@ -110,7 +111,7 @@ import json i = 42 v = [0.0] * $DIM v[i % $DIM] = 1.0 -v[(i+1) % $DIM] = (i % 17) * 0.01 +v[(i+1) % $DIM] = i * 0.0001 # match the unique-per-i perturbation print(json.dumps({"vector": v, "k": 3})) EOF SEARCH_RESP="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \ diff --git a/scripts/g1_smoke.toml b/scripts/g1_smoke.toml new file mode 100644 index 0000000..95e7b7d --- /dev/null +++ b/scripts/g1_smoke.toml @@ -0,0 +1,24 @@ +# g1_smoke override — disables vectord persistence so the in-memory +# API test stays pure (no rehydrate-from-storaged contamination). +# g1p_smoke covers the persistence path. + +[gateway] +bind = "127.0.0.1:3110" +storaged_url = "http://127.0.0.1:3211" +catalogd_url = "http://127.0.0.1:3212" +ingestd_url = "http://127.0.0.1:3213" +queryd_url = "http://127.0.0.1:3214" +vectord_url = "http://127.0.0.1:3215" + +[storaged] +bind = "127.0.0.1:3211" + +[vectord] +bind = "127.0.0.1:3215" +storaged_url = "" # disable persistence — pure in-memory for this smoke + +[s3] +endpoint = "http://localhost:9000" +region = "us-east-1" +bucket = "lakehouse-go-primary" +use_path_style = true diff --git a/scripts/g1p_smoke.sh b/scripts/g1p_smoke.sh new file mode 100755 index 0000000..5da764f --- /dev/null +++ b/scripts/g1p_smoke.sh @@ -0,0 +1,167 @@ +#!/usr/bin/env bash +# G1P smoke — vectord persistence across restart. +# +# Validates: +# - Create index + add 50 vectors → Save fires (storaged shows +# _vectors/persist_demo.json AND _vectors/persist_demo.hnsw) +# - Search w-001 → recall=1 with distance ≈ 0 +# - Kill vectord → restart → initial Refresh enumerates persisted +# indexes, Loads each back into the registry +# - Same search post-restart → still recall=1 (state survived) +# - DELETE → both files removed from storaged → restart → index gone +# +# Requires storaged + vectord both up. +# +# Usage: ./scripts/g1p_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[g1p-smoke] building storaged + vectord + gateway..." +go build -o bin/ ./cmd/storaged ./cmd/vectord ./cmd/gateway + +pkill -f "bin/(storaged|vectord|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +cleanup() { + echo "[g1p-smoke] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +poll_health() { + local port="$1" deadline=$(($(date +%s) + 5)) + while [ "$(date +%s)" -lt "$deadline" ]; do + if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +NAME="persist_demo" +DIM=8 + +echo "[g1p-smoke] launching storaged..." +./bin/storaged > /tmp/storaged.log 2>&1 & +PIDS+=($!) +poll_health 3211 || { echo "storaged failed"; tail /tmp/storaged.log; exit 1; } + +# Clean any prior persisted state for this index name. +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_vectors/$NAME.lhv1" || true + +launch_vectord() { + ./bin/vectord > /tmp/vectord.log 2>&1 & + VECTORD_PID=$! + PIDS+=($VECTORD_PID) + poll_health 3215 || { echo "vectord failed"; tail /tmp/vectord.log; return 1; } +} + +launch_gateway() { + ./bin/gateway > /tmp/gateway.log 2>&1 & + PIDS+=($!) + poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; return 1; } +} + +echo "[g1p-smoke] launching vectord (round 1) → gateway..." +launch_vectord +launch_gateway + +FAILED=0 + +echo "[g1p-smoke] create index + add 50 vectors:" +HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/vectors/index \ + -H 'Content-Type: application/json' \ + -d "{\"name\":\"$NAME\",\"dimension\":$DIM,\"distance\":\"cosine\"}")" +if [ "$HTTP" != "201" ]; then echo " ✗ create → $HTTP"; FAILED=1; fi + +# Build a 50-vector batch. +python3 - < "$TMP/batch.json" +import json +items = [] +for i in range(50): + v = [0.0] * $DIM + v[i % $DIM] = 1.0 + v[(i+1) % $DIM] = (i % 13) * 0.01 + items.append({"id": f"w-{i:03d}", "vector": v, "metadata": {"row": i}}) +print(json.dumps({"items": items})) +EOF +LEN="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/add" \ + -H 'Content-Type: application/json' \ + -d @"$TMP/batch.json" | jq -r '.length')" +if [ "$LEN" = "50" ]; then echo " ✓ added 50 → length=50"; else echo " ✗ add → length=$LEN"; FAILED=1; fi + +echo "[g1p-smoke] verify storaged has the persistence file:" +FILE_KEY="_vectors/$NAME.lhv1" +FILE_HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$FILE_KEY")" +if [ "$FILE_HTTP" = "200" ]; then + echo " ✓ $FILE_KEY present in storaged" +else + echo " ✗ file=$FILE_HTTP"; FAILED=1 +fi + +echo "[g1p-smoke] search pre-restart:" +python3 - < "$TMP/q.json" +import json +i = 1 +v = [0.0] * $DIM +v[i % $DIM] = 1.0 +v[(i+1) % $DIM] = (i % 13) * 0.01 +print(json.dumps({"vector": v, "k": 1})) +EOF +TOP_PRE="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \ + -H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].id')" +if [ "$TOP_PRE" = "w-001" ]; then echo " ✓ pre-restart top hit = w-001"; else echo " ✗ top=$TOP_PRE"; FAILED=1; fi + +echo "[g1p-smoke] kill + restart vectord (rehydrate path):" +kill $VECTORD_PID 2>/dev/null || true +wait $VECTORD_PID 2>/dev/null || true +sleep 0.3 +launch_vectord +sleep 0.2 # let initial Refresh complete (no API to query rehydration done) + +echo "[g1p-smoke] vectord rehydrated index list shows $NAME:" +NAMES_HTTP="$(curl -sS http://127.0.0.1:3110/v1/vectors/index | jq -r '.names | length')" +if [ "$NAMES_HTTP" = "1" ]; then echo " ✓ list count=1 after restart"; else echo " ✗ count=$NAMES_HTTP"; FAILED=1; fi + +INFO_LEN="$(curl -sS "http://127.0.0.1:3110/v1/vectors/index/$NAME" | jq -r '.length')" +if [ "$INFO_LEN" = "50" ]; then echo " ✓ length=50 after restart (state survived)"; else echo " ✗ length=$INFO_LEN"; FAILED=1; fi + +echo "[g1p-smoke] search post-restart:" +TOP_POST="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \ + -H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].id')" +DIST_POST="$(curl -sS -X POST "http://127.0.0.1:3110/v1/vectors/index/$NAME/search" \ + -H 'Content-Type: application/json' -d @"$TMP/q.json" | jq -r '.results[0].distance')" +if [ "$TOP_POST" = "w-001" ]; then + echo " ✓ post-restart top hit = w-001 (dist=$DIST_POST)" +else + echo " ✗ post-restart top=$TOP_POST"; FAILED=1 +fi + +echo "[g1p-smoke] DELETE then restart → index gone:" +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3110/v1/vectors/index/$NAME" +FILE_HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$FILE_KEY")" +if [ "$FILE_HTTP" = "404" ]; then + echo " ✓ persistence file removed from storaged" +else + echo " ✗ file=$FILE_HTTP after delete"; FAILED=1 +fi +kill $VECTORD_PID 2>/dev/null || true +wait $VECTORD_PID 2>/dev/null || true +sleep 0.3 +launch_vectord +sleep 0.2 +NAMES_HTTP="$(curl -sS http://127.0.0.1:3110/v1/vectors/index | jq -r '.names | length')" +if [ "$NAMES_HTTP" = "0" ]; then echo " ✓ post-delete restart list count=0"; else echo " ✗ count=$NAMES_HTTP"; FAILED=1; fi + +if [ "$FAILED" -eq 0 ]; then + echo "[g1p-smoke] G1P acceptance gate: PASSED" + exit 0 +else + echo "[g1p-smoke] G1P acceptance gate: FAILED" + exit 1 +fi