root 8b92518d21 G1P: vectord persistence to storaged + scrum (3 fixes incl. 3-way convergent)
Adds optional persistence to vectord (G1's HNSW vector search). Single-
file framed format per index — eliminates the torn-write class that
the 3-way convergent scrum finding identified:

  _vectors/<name>.lhv1  — single binary blob:
      [4 bytes magic "LHV1"]
      [4 bytes envelope_len uint32 BE]
      [envelope bytes — JSON params + metadata + version]
      [graph bytes — raw hnsw.Graph.Export]

Pre-extraction: internal/catalogd/store_client.go → internal/storeclient/
shared package, since both catalogd and vectord need it. Same pattern as
the pre-D5 catalogclient extraction.

Optional via [vectord].storaged_url config (empty = ephemeral mode).
On startup: List + Load each persisted index. After Create / batch Add /
DELETE: Save (or Delete from storaged). Save failures are logged-not-
fatal — in-memory state is the source of truth in flight.

Acceptance smoke G1P 8/8 PASS — kill+restart preserves state, post-
restart search returns dist=0 (graph round-trips exactly), DELETE
removes the file, post-delete restart shows count=0.

All 8 smokes (D1-D6 + G1 + G1P) PASS deterministically. The g1_smoke
gained scripts/g1_smoke.toml that disables persistence so the
in-memory API test stays decoupled from any rehydrate-from-storaged
state contamination.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                     1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter):               1 BLOCK + 2 WARN
  - Qwen3-coder (openrouter):                2 BLOCK + 2 WARN + 1 INFO

Fixed (3 — 1 convergent + 2 single-reviewer):
  C1 (Opus + Kimi + Qwen 3-WAY CONVERGENT WARN): Save was non-atomic
    across two PUTs — envelope-succeeds + graph-fails left a half-
    saved index that passed the "both present" List filter and
    silently mismatched metadata against vectors on Load.
    Fix: collapse to single framed file (no torn-write window
    possible).
  O-B1 (Opus BLOCK): isNotFound substring-matched "key not found"
    against the wrapped error message — brittle, any 5xx body
    containing that text would silently misclassify as missing.
    Fix: errors.Is(err, storeclient.ErrKeyNotFound).
  O-I3 (Opus INFO): handleAdd pre-validation only covered id+dim;
    NaN/Inf/zero-norm could still fail mid-batch leaving partial
    commits. Fix: extend pre-validation to call ValidateVector
    (newly exported) per item before any commit.

Dismissed (3 false positives):
  K-B1 + Q-B1 ("safeKey double-escapes %2F segments") — false
    convergent. Wire-protocol escape is decoded by storaged's chi
    router on the way in; on-disk key is the original literal.
    %2F round-trips correctly through PathEscape → URL → chi decode
    → S3 key.
  Q-B2 ("List vulnerable to race conditions") — vectord is single-
    process; no concurrent Save against List in the same vectord.

Deferred (3): rehydrate per-index timeout (G2+ multi-index scale),
saveAfter request ctx (matches G0 timeout deferral), Encode RLock
during slow writer (documented as buffer-only API).

The C1 finding is the strongest signal of the cross-lineage filter:
three independent reviewers all flagged the same torn-write hazard.
Single-file framing eliminates the class — there's now no Persistor
state where envelope and graph can disagree.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 01:33:23 -05:00

101 lines
2.9 KiB
Go

// registry.go — multi-index manager. Mirrors the catalogd Registry
// shape: a thread-safe map[name]*Index with Create / Get / Delete.
// Per-index operations (Add, Search) go through each Index's own
// RWMutex so registry-wide locking only fires on lifecycle events.
package vectord
import (
"errors"
"fmt"
"sort"
"sync"
)
// ErrIndexNotFound is returned by Get / Delete when the requested
// name has no registered index.
var ErrIndexNotFound = errors.New("vectord: index not found")
// ErrIndexAlreadyExists is returned by Create when the name is
// taken. Callers can treat this as a 409 Conflict — paralleling
// catalogd's ADR-020 idempotency contract, but stricter (no
// "same params reuses index" semantics yet).
var ErrIndexAlreadyExists = errors.New("vectord: index already exists")
// Registry holds the live indexes by name.
type Registry struct {
mu sync.RWMutex
indexes map[string]*Index
}
func NewRegistry() *Registry {
return &Registry{indexes: make(map[string]*Index)}
}
// Create builds a new Index from params and registers it under
// params.Name. Returns ErrIndexAlreadyExists if the name is taken.
func (r *Registry) Create(p IndexParams) (*Index, error) {
idx, err := NewIndex(p)
if err != nil {
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.indexes[p.Name]; exists {
return nil, fmt.Errorf("%w: %q", ErrIndexAlreadyExists, p.Name)
}
r.indexes[p.Name] = idx
return idx, nil
}
// RegisterPrebuilt installs an already-built Index under its
// params.Name. Used by the persistor's rehydrate path — the index
// has been Decoded from disk and shouldn't be re-built fresh by
// Create. Returns ErrIndexAlreadyExists if the name is taken.
func (r *Registry) RegisterPrebuilt(idx *Index) error {
r.mu.Lock()
defer r.mu.Unlock()
name := idx.Params().Name
if _, exists := r.indexes[name]; exists {
return fmt.Errorf("%w: %q", ErrIndexAlreadyExists, name)
}
r.indexes[name] = idx
return nil
}
// Get returns the index for name, or ErrIndexNotFound.
func (r *Registry) Get(name string) (*Index, error) {
r.mu.RLock()
defer r.mu.RUnlock()
idx, ok := r.indexes[name]
if !ok {
return nil, fmt.Errorf("%w: %q", ErrIndexNotFound, name)
}
return idx, nil
}
// Delete removes the index for name. Returns ErrIndexNotFound if
// not present (so callers see explicit no-op vs success on the
// idempotent path).
func (r *Registry) Delete(name string) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.indexes[name]; !ok {
return fmt.Errorf("%w: %q", ErrIndexNotFound, name)
}
delete(r.indexes, name)
return nil
}
// Names returns the registered index names sorted ascending —
// stable enumeration for /v1/vectors GET listings.
func (r *Registry) Names() []string {
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]string, 0, len(r.indexes))
for name := range r.indexes {
out = append(out, name)
}
sort.Strings(out)
return out
}