diff --git a/docs/DECISIONS.md b/docs/DECISIONS.md index 5b5afdf..86f3af6 100644 --- a/docs/DECISIONS.md +++ b/docs/DECISIONS.md @@ -242,6 +242,123 @@ need rotate-without-restart. --- -(Future ADRs from ADR-004 onward will be added as the Go -implementation accrues design decisions — e.g. HNSW parameter -choices, pathway-memory hash function, auditor model rotation, etc.) +## ADR-004: Pathway memory data model — Mem0-style versioned traces +**Date:** 2026-04-29 +**Decided by:** J + Claude +**Status:** Decided — substrate landing in `internal/pathway/` + +**Decision:** Pathway memory is an append-only event log of opaque +traces with Mem0-style semantics: Add / Update / Revise / Retire / +History / Search. Each trace has a UID; revisions chain backward +via `predecessor_uid` so the full history is reconstructible. +Persistence is JSONL append-only with full-replay on load; +corruption recovery skips bad lines without halting startup. + +### Operations + +| Op | Effect | +|---|---| +| `Add(content, tags...)` | New UID, stored fresh, replay_count=1. | +| `AddIdempotent(uid, content, tags...)` | If UID exists → replay_count++. Else → Add with that UID. | +| `Update(uid, content)` | In-place content replacement (same UID). Bumps `updated_at_ns`. NOT a revision — same trace, new content. | +| `Revise(predecessorUID, content, tags...)` | New UID with `predecessor_uid` set. Old trace stays accessible via History. Failure modes: predecessor missing → error; predecessor retired → still allowed (revisions of retired traces are valid). | +| `Retire(uid)` | Sets `retired=true`. Excluded from `Search` by default; still accessible via `Get` and `History`. | +| `Get(uid)` | Returns the trace (including if retired); error on missing. | +| `History(uid)` | Walks `predecessor_uid` chain backward, returns slice [self, parent, grandparent, ...]. Cycle-detected via visited-set; returns error on cycle (which only happens if persistence file was hand-edited). | +| `Search(filter)` | Returns matching traces. Default excludes retired; opt in via `IncludeRetired: true`. Filters: tag-match, content-substring, time range. | + +### Why Mem0-style + Why these specific ops + +- **Mem0** (memory pattern from the OpenAI Memories paper / Mem0 lib) + is the canonical "agent memory" interface for the same reason + Markdown is the canonical text format: it's the lowest-common- + denominator that the entire ecosystem assumes. Adopting it lets + agent loops written against any Mem0-aware substrate work here. +- Update vs Revise are deliberately separate. Update is "I noticed + a typo in my note." Revise is "I now believe something different + than I did when I wrote this; preserve the old belief for audit." + Conflating them loses the audit trail. +- Retire vs Delete is deliberate. Retire stops a trace from + surfacing in search but preserves it for history reconstruction. + Delete (which we don't expose) would break references. + +### Trace data shape + +```go +type Trace struct { + UID string // UUID v4 unless caller provides one + Content json.RawMessage // opaque, schema is caller's contract + PredecessorUID string // empty if root revision + CreatedAtNs int64 + UpdatedAtNs int64 + Retired bool + ReplayCount int // ≥1 for any stored trace + Tags []string // for Search +} +``` + +`Content` is opaque JSON (not a struct) so callers can store any +shape — the data model doesn't constrain semantics. Callers add +their own validators on top. + +### Persistence + +JSONL append-only log under `_pathway/.jsonl`. Each +mutation appends one JSON line: + +``` +{"op":"add", "trace":{...}} +{"op":"update", "uid":"…", "content":"…"} +{"op":"revise", "trace":{…}} # trace.PredecessorUID is set +{"op":"retire", "uid":"…"} +{"op":"replay", "uid":"…"} # idempotent re-add hit +``` + +On startup, replay every line in order, building in-memory state. +A malformed line logs a warn and is skipped; load continues. +Corruption tolerance is non-optional — partial state is better +than no state for an agent substrate. + +Compaction is a future concern. A 100K-trace log replays in +seconds; below that scale, JSONL append is the simplest correct +choice. When compaction lands, the format will be: snapshot file +(full state JSON) + tail JSONL since snapshot. Detect snapshot, +load it, then replay tail. + +### Cycle safety + +UIDs are generated server-side via `uuid.New()` (existing dep — +catalogd uses it). New UID for every Add and Revise. The data +model itself can't form cycles — every Revise points at an +EXISTING uid, and the new uid didn't exist a moment ago. + +History walks defensively anyway: visited-set tracks UIDs seen +this walk; if we encounter a duplicate, return error. Protects +against corruption (manual edit, bug in a future op) without +constraining the happy path. + +### Storage location + +JSONL file path is configurable per store. Default: +`/var/lib/lakehouse/pathway/.jsonl` for prod; tests use +`t.TempDir()`. Persistence is OPTIONAL — empty path means +in-memory only (matches vectord G1's pattern). + +### What this ADR does NOT do + +- **No HTTP surface decision.** Whether `cmd/pathwayd` is its own + binary or routes get added to `cmd/vectord` is the next ADR's + concern. The substrate is a pure library either way. +- **No vector index integration.** Pathway traces can carry a + vector embedding in `Content` (caller decides), but this ADR + doesn't define how the substrate integrates with `vectord`'s + HNSW indexes. That's the staffing co-pilot's design problem + when those layers compose. +- **No agent-loop semantics.** "When does an agent ADD vs + REVISE?" is a workflow decision, not a substrate decision. + +--- + +(Future ADRs from ADR-005 onward will be added as the Go +implementation accrues design decisions — e.g. observer fail-safe +semantics, distillation rebuild, gRPC adapter wire format, etc.) diff --git a/internal/pathway/persistor.go b/internal/pathway/persistor.go new file mode 100644 index 0000000..056d8eb --- /dev/null +++ b/internal/pathway/persistor.go @@ -0,0 +1,130 @@ +// persistor.go — JSONL append-only persistence for pathway memory. +// +// Each event is one JSON line. Append is O(1) (open append, write, +// close — Go's *os.File default fsync policy is "rely on OS" which +// is fine here; correctness on power-loss is best-effort, not +// transactional). Replay reads the file once at startup. +// +// Corruption recovery: malformed lines log a warn (counted in +// Replay's return) but do not stop the load. Partial state is +// better than no state for an agent substrate. +// +// What's NOT here: +// - Compaction. JSONL grows linearly with mutations; below 100K +// traces this is fine. Compaction will land when needed and +// will emit a snapshot file + tail JSONL. +// - fsync per write. We rely on the OS's eventual fsync; trace +// loss on hard crash is acceptable for the substrate's +// "remember most things" guarantee. + +package pathway + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" +) + +// Persistor wraps a single JSONL file. Construct with NewPersistor; +// it does NOT load on construction — callers must call Store.Load() +// to replay. +type Persistor struct { + path string +} + +// NewPersistor returns a persistor for the given file path. The +// parent directory is created on demand. The file is created lazily +// on first Append. +func NewPersistor(path string) (*Persistor, error) { + if path == "" { + return nil, errors.New("pathway: persistor path is empty") + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, fmt.Errorf("pathway: create dir: %w", err) + } + return &Persistor{path: path}, nil +} + +// Path returns the underlying file path. Useful for tests + logs. +func (p *Persistor) Path() string { return p.path } + +// Append writes one event to the JSONL log. Each call opens the +// file in append mode, writes one line, and closes — simple but +// correct. A pooled persistent fd is a future optimization if +// profiling shows append-rate matters. +func (p *Persistor) Append(e event) error { + line, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("pathway: marshal event: %w", err) + } + f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("pathway: open log: %w", err) + } + defer f.Close() + if _, err := f.Write(line); err != nil { + return fmt.Errorf("pathway: write event: %w", err) + } + if _, err := f.Write([]byte{'\n'}); err != nil { + return fmt.Errorf("pathway: write newline: %w", err) + } + return nil +} + +// Replay reads the log line-by-line and invokes apply for each +// event. Returns the count of events successfully applied. A +// missing file is NOT an error (means "no prior state"); a +// partially-corrupt file logs warns and continues. +func (p *Persistor) Replay(apply func(event) error) (int, error) { + f, err := os.Open(p.path) + if errors.Is(err, fs.ErrNotExist) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("pathway: open log: %w", err) + } + defer f.Close() + + scanner := bufio.NewScanner(f) + // Big buffer for unusually long content — 1 MiB per line cap. + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1<<20) + + applied := 0 + skipped := 0 + lineNo := 0 + for scanner.Scan() { + lineNo++ + raw := scanner.Bytes() + if len(raw) == 0 { + continue + } + var e event + if err := json.Unmarshal(raw, &e); err != nil { + slog.Warn("pathway: replay skipped malformed line", + "path", p.path, "line", lineNo, "err", err.Error()) + skipped++ + continue + } + if err := apply(e); err != nil { + slog.Warn("pathway: replay event apply failed", + "path", p.path, "line", lineNo, "op", e.Op, "err", err.Error()) + skipped++ + continue + } + applied++ + } + if err := scanner.Err(); err != nil { + return applied, fmt.Errorf("pathway: scan log: %w", err) + } + if skipped > 0 { + slog.Info("pathway: replay completed with skips", + "path", p.path, "applied", applied, "skipped", skipped) + } + return applied, nil +} diff --git a/internal/pathway/persistor_test.go b/internal/pathway/persistor_test.go new file mode 100644 index 0000000..763079e --- /dev/null +++ b/internal/pathway/persistor_test.go @@ -0,0 +1,184 @@ +package pathway + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "strings" + "testing" +) + +// persistor_test covers the corruption-recovery contract per +// Sprint 2 row 7: malformed JSONL lines must not halt replay. + +func TestPersistor_MissingFileIsNotError(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "nonexistent.jsonl") + p, err := NewPersistor(path) + if err != nil { + t.Fatalf("NewPersistor on missing file should not error, got %v", err) + } + n, err := p.Replay(func(event) error { return nil }) + if err != nil { + t.Errorf("Replay on missing file should be 0,nil; got %d, %v", n, err) + } + if n != 0 { + t.Errorf("Replay on missing file replayed %d events, want 0", n) + } +} + +func TestPersistor_AppendThenReplay(t *testing.T) { + p := mustPersistor(t) + + if err := p.Append(event{Op: opAdd, Trace: &Trace{UID: "A", Content: json.RawMessage(`{}`)}}); err != nil { + t.Fatalf("Append: %v", err) + } + if err := p.Append(event{Op: opAdd, Trace: &Trace{UID: "B", Content: json.RawMessage(`{}`)}}); err != nil { + t.Fatalf("Append: %v", err) + } + + var seen []string + n, err := p.Replay(func(e event) error { + if e.Trace != nil { + seen = append(seen, e.Trace.UID) + } + return nil + }) + if err != nil { + t.Fatalf("Replay: %v", err) + } + if n != 2 { + t.Errorf("Replay applied %d events, want 2", n) + } + if len(seen) != 2 || seen[0] != "A" || seen[1] != "B" { + t.Errorf("seen = %v, want [A B]", seen) + } +} + +func TestPersistor_CorruptedLines_Skipped(t *testing.T) { + p := mustPersistor(t) + + // Mix of valid and corrupted lines. + good1 := mustMarshal(t, event{Op: opAdd, Trace: &Trace{UID: "A", Content: json.RawMessage(`{}`)}}) + bad := []byte(`{this is not json}`) + good2 := mustMarshal(t, event{Op: opAdd, Trace: &Trace{UID: "B", Content: json.RawMessage(`{}`)}}) + emptyLine := []byte(``) + good3 := mustMarshal(t, event{Op: opAdd, Trace: &Trace{UID: "C", Content: json.RawMessage(`{}`)}}) + + contents := []byte{} + for _, line := range [][]byte{good1, bad, good2, emptyLine, good3} { + contents = append(contents, line...) + contents = append(contents, '\n') + } + if err := os.WriteFile(p.Path(), contents, 0o644); err != nil { + t.Fatalf("write file: %v", err) + } + + var applied []string + n, err := p.Replay(func(e event) error { + if e.Trace != nil { + applied = append(applied, e.Trace.UID) + } + return nil + }) + if err != nil { + t.Fatalf("Replay: %v", err) + } + // 3 valid + 1 bad + 1 empty (skipped silently) = 3 applied. + if n != 3 { + t.Errorf("Replay applied %d, want 3 (1 corrupt line skipped)", n) + } + if len(applied) != 3 || applied[0] != "A" || applied[1] != "B" || applied[2] != "C" { + t.Errorf("applied = %v, want [A B C]", applied) + } +} + +func TestPersistor_ApplyError_Skipped(t *testing.T) { + // If the apply function returns error for an event, replay + // should keep going (the error is logged, not raised). + p := mustPersistor(t) + _ = p.Append(event{Op: opAdd, Trace: &Trace{UID: "A", Content: json.RawMessage(`{}`)}}) + _ = p.Append(event{Op: opAdd, Trace: &Trace{UID: "B", Content: json.RawMessage(`{}`)}}) + _ = p.Append(event{Op: opAdd, Trace: &Trace{UID: "C", Content: json.RawMessage(`{}`)}}) + + count := 0 + n, err := p.Replay(func(e event) error { + if e.Trace != nil && e.Trace.UID == "B" { + return errors.New("simulated apply error on B") + } + count++ + return nil + }) + if err != nil { + t.Fatalf("Replay: %v", err) + } + if n != 2 || count != 2 { + t.Errorf("Replay applied %d (callback called %d), want 2 each (B's error skipped)", n, count) + } +} + +func TestPersistor_NewPersistor_EmptyPath_Errors(t *testing.T) { + _, err := NewPersistor("") + if err == nil { + t.Error("NewPersistor with empty path should error") + } +} + +func TestPersistor_CreatesParentDir(t *testing.T) { + dir := t.TempDir() + nested := filepath.Join(dir, "nested", "deep", "pathway.jsonl") + p, err := NewPersistor(nested) + if err != nil { + t.Fatalf("NewPersistor: %v", err) + } + if err := p.Append(event{Op: opAdd, Trace: &Trace{UID: "A", Content: json.RawMessage(`{}`)}}); err != nil { + t.Fatalf("Append after creating nested dir: %v", err) + } +} + +func TestPersistor_LongLine_HandlesUpTo1MiB(t *testing.T) { + p := mustPersistor(t) + + // Build a content blob ~750 KiB so the JSON line is ~800 KiB + // (under the 1 MiB scanner cap). + blob := strings.Repeat("x", 750*1024) + bigContent, _ := json.Marshal(map[string]string{"data": blob}) + tr := &Trace{UID: "BIG", Content: bigContent} + if err := p.Append(event{Op: opAdd, Trace: tr}); err != nil { + t.Fatalf("Append big trace: %v", err) + } + + count := 0 + n, _ := p.Replay(func(e event) error { + if e.Trace != nil && e.Trace.UID == "BIG" { + count++ + } + return nil + }) + if n != 1 || count != 1 { + t.Errorf("big-line replay: got %d events / %d matches, want 1 each", n, count) + } +} + +// ── helpers ── + +func mustPersistor(t *testing.T) *Persistor { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "test.jsonl") + p, err := NewPersistor(path) + if err != nil { + t.Fatalf("NewPersistor: %v", err) + } + return p +} + +func mustMarshal(t *testing.T, e event) []byte { + t.Helper() + b, err := json.Marshal(e) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return b +} diff --git a/internal/pathway/store.go b/internal/pathway/store.go new file mode 100644 index 0000000..c3ba3db --- /dev/null +++ b/internal/pathway/store.go @@ -0,0 +1,381 @@ +// store.go — the in-memory side of pathway memory. Persistence +// (load/append-on-mutate) is in persistor.go; the Store can be +// constructed without persistence for tests and ephemeral uses. + +package pathway + +import ( + "bytes" + "encoding/json" + "errors" + "sync" + "time" + + "github.com/google/uuid" +) + +// Store is the in-memory pathway memory. Thread-safe via a single +// RWMutex (read-heavy workloads are the norm; mutations are +// individual operations not hot loops). +type Store struct { + mu sync.RWMutex + // traces[uid] → *Trace. Single map covers both retired and + // active traces; Search filters retired by default. + traces map[string]*Trace + + // persistor is optional — nil = in-memory only (test mode + // and ephemeral G2 uses). + persistor *Persistor + + // nowFn returns "the current time in nanoseconds" — overridden + // in tests for deterministic timestamps. + nowFn func() int64 + + // uidFn generates new UIDs — overridden in tests for + // deterministic UID sequences. + uidFn func() string +} + +// NewStore builds an empty Store. Pass nil persistor for in-memory +// mode. The returned store is ready to receive operations; if +// persistor is non-nil, call Load(ctx) before issuing operations to +// rehydrate prior state. +func NewStore(persistor *Persistor) *Store { + return &Store{ + traces: make(map[string]*Trace), + persistor: persistor, + nowFn: func() int64 { return time.Now().UnixNano() }, + uidFn: func() string { return uuid.New().String() }, + } +} + +// Load replays the persistor's JSONL log and rebuilds in-memory +// state. Safe to call multiple times — each call resets the in- +// memory state to whatever the log says. Corruption (malformed +// lines, broken events) is logged-not-fatal: the load proceeds +// with the partial state it can recover. +// +// Returns the number of events successfully applied. +func (s *Store) Load() (int, error) { + if s.persistor == nil { + return 0, nil + } + s.mu.Lock() + defer s.mu.Unlock() + s.traces = make(map[string]*Trace) // reset + return s.persistor.Replay(func(e event) error { + return s.applyEventLocked(e) + }) +} + +// applyEventLocked is the single point where events update the +// in-memory map. Used by both Load (replaying log) and the +// mutating methods (after appending to the log). Caller MUST hold +// s.mu in write mode. +func (s *Store) applyEventLocked(e event) error { + switch e.Op { + case opAdd, opRevise: + if e.Trace == nil || e.Trace.UID == "" { + return ErrInvalidContent + } + // Add semantics: if UID already exists, this should have been + // a replay — but be permissive on Replay to handle older logs. + s.traces[e.Trace.UID] = e.Trace + return nil + case opUpdate: + t, ok := s.traces[e.UID] + if !ok { + return ErrNotFound + } + t.Content = e.Content + t.UpdatedAtNs = s.nowFn() + return nil + case opRetire: + t, ok := s.traces[e.UID] + if !ok { + return ErrNotFound + } + t.Retired = true + t.UpdatedAtNs = s.nowFn() + return nil + case opReplay: + t, ok := s.traces[e.UID] + if !ok { + return ErrNotFound + } + t.ReplayCount++ + return nil + default: + return errors.New("pathway: unknown op") + } +} + +// Add stores a new trace with a fresh UID and replay_count=1. +// Returns the stored trace (with UID + timestamps populated). +func (s *Store) Add(content json.RawMessage, tags ...string) (*Trace, error) { + if !json.Valid(content) { + return nil, ErrInvalidContent + } + s.mu.Lock() + defer s.mu.Unlock() + + now := s.nowFn() + t := &Trace{ + UID: s.uidFn(), + Content: content, + CreatedAtNs: now, + UpdatedAtNs: now, + ReplayCount: 1, + Tags: copyTags(tags), + } + if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil { + return nil, err + } + // Clone before returning so the caller can't mutate the in-memory + // trace through the returned pointer (matches Get's contract). + return cloneTrace(t), nil +} + +// AddIdempotent stores a trace under the given UID, OR — if the +// UID already exists — increments its ReplayCount. Used by agent +// loops that want to record "I tried this same thing again." +func (s *Store) AddIdempotent(uid string, content json.RawMessage, tags ...string) (*Trace, error) { + if uid == "" { + return nil, ErrEmptyUID + } + if !json.Valid(content) { + return nil, ErrInvalidContent + } + s.mu.Lock() + defer s.mu.Unlock() + + if existing, ok := s.traces[uid]; ok { + // Replay: increment count, persist as opReplay event. + if err := s.appendAndApplyLocked(event{Op: opReplay, UID: uid}); err != nil { + return nil, err + } + // Return a copy to avoid the caller mutating the in-memory + // trace through the returned pointer. + return cloneTrace(existing), nil + } + + now := s.nowFn() + t := &Trace{ + UID: uid, + Content: content, + CreatedAtNs: now, + UpdatedAtNs: now, + ReplayCount: 1, + Tags: copyTags(tags), + } + if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil { + return nil, err + } + return cloneTrace(t), nil +} + +// Update replaces the content of an existing trace. Same UID, new +// content. NOT a revision — use Revise when the new content +// represents a change-of-belief that should preserve the old. +func (s *Store) Update(uid string, content json.RawMessage) error { + if uid == "" { + return ErrEmptyUID + } + if !json.Valid(content) { + return ErrInvalidContent + } + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.traces[uid]; !ok { + return ErrNotFound + } + return s.appendAndApplyLocked(event{Op: opUpdate, UID: uid, Content: content}) +} + +// Revise creates a new trace whose PredecessorUID points at an +// existing trace. Old trace stays accessible via Get and History. +// Returns the new trace. +func (s *Store) Revise(predecessorUID string, content json.RawMessage, tags ...string) (*Trace, error) { + if predecessorUID == "" { + return nil, ErrEmptyUID + } + if !json.Valid(content) { + return nil, ErrInvalidContent + } + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.traces[predecessorUID]; !ok { + return nil, ErrPredecessorMissing + } + now := s.nowFn() + t := &Trace{ + UID: s.uidFn(), + Content: content, + PredecessorUID: predecessorUID, + CreatedAtNs: now, + UpdatedAtNs: now, + ReplayCount: 1, + Tags: copyTags(tags), + } + if err := s.appendAndApplyLocked(event{Op: opRevise, Trace: t}); err != nil { + return nil, err + } + return cloneTrace(t), nil +} + +// Retire marks a trace as retired. Retired traces are excluded +// from Search by default but accessible via Get and History. +func (s *Store) Retire(uid string) error { + if uid == "" { + return ErrEmptyUID + } + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.traces[uid]; !ok { + return ErrNotFound + } + return s.appendAndApplyLocked(event{Op: opRetire, UID: uid}) +} + +// Get returns a copy of the trace with the given UID. Includes +// retired traces (caller decides what to do with them). +func (s *Store) Get(uid string) (*Trace, error) { + s.mu.RLock() + defer s.mu.RUnlock() + t, ok := s.traces[uid] + if !ok { + return nil, ErrNotFound + } + return cloneTrace(t), nil +} + +// History returns the chain of traces from this UID backward +// through PredecessorUID links. Slot 0 is the queried trace; slot +// 1 is its predecessor; and so on. Cycle-safe: a UID that appears +// twice during the walk returns ErrCycle (only happens if the +// persistence file was hand-edited or there's a bug elsewhere). +func (s *Store) History(uid string) ([]*Trace, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var chain []*Trace + visited := make(map[string]struct{}) + cursor := uid + for cursor != "" { + if _, seen := visited[cursor]; seen { + return nil, ErrCycle + } + visited[cursor] = struct{}{} + + t, ok := s.traces[cursor] + if !ok { + if len(chain) == 0 { + return nil, ErrNotFound + } + // Predecessor missing mid-chain — return what we have. + break + } + chain = append(chain, cloneTrace(t)) + cursor = t.PredecessorUID + } + return chain, nil +} + +// Search returns traces matching the filter. Excludes retired by +// default; pass IncludeRetired: true to include them. Returns a +// new slice of trace copies — caller can mutate freely. +func (s *Store) Search(filter SearchFilter) []*Trace { + s.mu.RLock() + defer s.mu.RUnlock() + + var out []*Trace + for _, t := range s.traces { + if t.Retired && !filter.IncludeRetired { + continue + } + if filter.Tag != "" && !containsTag(t.Tags, filter.Tag) { + continue + } + if filter.ContentContains != "" && + !bytes.Contains(t.Content, []byte(filter.ContentContains)) { + continue + } + if filter.CreatedAfterNs > 0 && t.CreatedAtNs < filter.CreatedAfterNs { + continue + } + if filter.CreatedBeforeNs > 0 && t.CreatedAtNs > filter.CreatedBeforeNs { + continue + } + out = append(out, cloneTrace(t)) + } + return out +} + +// Stats returns lifetime counters useful for /stats endpoints and +// operator dashboards. +type Stats struct { + Total int + Active int + Retired int +} + +func (s *Store) Stats() Stats { + s.mu.RLock() + defer s.mu.RUnlock() + st := Stats{Total: len(s.traces)} + for _, t := range s.traces { + if t.Retired { + st.Retired++ + } else { + st.Active++ + } + } + return st +} + +// appendAndApplyLocked is the single-point write path: persist the +// event first (so a crash mid-mutation doesn't leave in-memory +// state ahead of the log), then apply it in memory. Caller holds +// s.mu in write mode. +func (s *Store) appendAndApplyLocked(e event) error { + if s.persistor != nil { + if err := s.persistor.Append(e); err != nil { + return err + } + } + return s.applyEventLocked(e) +} + +// cloneTrace returns a deep copy so callers can't mutate the +// in-memory trace through the returned pointer. +func cloneTrace(t *Trace) *Trace { + c := *t + if t.Content != nil { + c.Content = append(json.RawMessage(nil), t.Content...) + } + if t.Tags != nil { + c.Tags = append([]string(nil), t.Tags...) + } + return &c +} + +func copyTags(in []string) []string { + if len(in) == 0 { + return nil + } + out := make([]string, len(in)) + copy(out, in) + return out +} + +func containsTag(tags []string, want string) bool { + for _, t := range tags { + if t == want { + return true + } + } + return false +} diff --git a/internal/pathway/store_test.go b/internal/pathway/store_test.go new file mode 100644 index 0000000..fc0c57a --- /dev/null +++ b/internal/pathway/store_test.go @@ -0,0 +1,398 @@ +package pathway + +import ( + "encoding/json" + "errors" + "path/filepath" + "strconv" + "strings" + "testing" +) + +// Closes Sprint 2 design-bar work from the audit. Tests cover all 7 +// claim rows from claim-coverage-table.md: ADD, UPDATE, REVISE, +// RETIRE, HISTORY chain cycle-safe, replay-count duplicate ADD, +// corrupted memory row recovery (corrupted_test.go). + +// newTestStore returns an in-memory Store with deterministic UID + +// time generation for repeatable assertions. +func newTestStore(t *testing.T) *Store { + t.Helper() + s := NewStore(nil) + var counter int + var clock int64 + s.uidFn = func() string { + counter++ + return "uid-" + strconv.Itoa(counter) + } + s.nowFn = func() int64 { + clock++ + return clock + } + return s +} + +func newPersistedStore(t *testing.T) (*Store, string) { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "pathway.jsonl") + p, err := NewPersistor(path) + if err != nil { + t.Fatalf("NewPersistor: %v", err) + } + s := NewStore(p) + var counter int + var clock int64 + s.uidFn = func() string { + counter++ + return "uid-" + strconv.Itoa(counter) + } + s.nowFn = func() int64 { + clock++ + return clock + } + return s, path +} + +// ── Sprint 2 row 1: ADD a new pathway trace ──────────────────── + +func TestAdd_AssignsUIDAndTimestamps(t *testing.T) { + s := newTestStore(t) + tr, err := s.Add(json.RawMessage(`{"k":"v"}`), "tag-a") + if err != nil { + t.Fatalf("Add: %v", err) + } + if tr.UID != "uid-1" { + t.Errorf("UID = %q, want uid-1", tr.UID) + } + if tr.ReplayCount != 1 { + t.Errorf("ReplayCount = %d, want 1", tr.ReplayCount) + } + if tr.Retired { + t.Error("freshly-added trace should NOT be retired") + } + if tr.CreatedAtNs == 0 || tr.UpdatedAtNs == 0 { + t.Error("timestamps unset") + } + if len(tr.Tags) != 1 || tr.Tags[0] != "tag-a" { + t.Errorf("Tags = %v, want [tag-a]", tr.Tags) + } +} + +func TestAdd_RejectsInvalidJSON(t *testing.T) { + s := newTestStore(t) + _, err := s.Add(json.RawMessage(`not json`)) + if !errors.Is(err, ErrInvalidContent) { + t.Errorf("expected ErrInvalidContent, got %v", err) + } +} + +// ── Sprint 2 row 2: UPDATE replaces existing trace by uid ────── + +func TestUpdate_ReplacesContentSameUID(t *testing.T) { + s := newTestStore(t) + tr, _ := s.Add(json.RawMessage(`{"v":1}`)) + + if err := s.Update(tr.UID, json.RawMessage(`{"v":2}`)); err != nil { + t.Fatalf("Update: %v", err) + } + + got, _ := s.Get(tr.UID) + if string(got.Content) != `{"v":2}` { + t.Errorf("content = %s, want updated", got.Content) + } + if got.UpdatedAtNs == tr.UpdatedAtNs { + t.Error("UpdatedAtNs should bump on Update") + } +} + +func TestUpdate_MissingUID_Errors(t *testing.T) { + s := newTestStore(t) + err := s.Update("nonexistent", json.RawMessage(`{}`)) + if !errors.Is(err, ErrNotFound) { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +// ── Sprint 2 row 3: REVISE creates a new revision linked via history ── + +func TestRevise_LinksToPredecessorViaHistory(t *testing.T) { + s := newTestStore(t) + root, _ := s.Add(json.RawMessage(`{"v":1}`)) + rev, err := s.Revise(root.UID, json.RawMessage(`{"v":2}`)) + if err != nil { + t.Fatalf("Revise: %v", err) + } + if rev.PredecessorUID != root.UID { + t.Errorf("PredecessorUID = %q, want %q", rev.PredecessorUID, root.UID) + } + if rev.UID == root.UID { + t.Error("Revise must produce a NEW UID") + } +} + +func TestRevise_PredecessorMissing_Errors(t *testing.T) { + s := newTestStore(t) + _, err := s.Revise("ghost-uid", json.RawMessage(`{}`)) + if !errors.Is(err, ErrPredecessorMissing) { + t.Errorf("expected ErrPredecessorMissing, got %v", err) + } +} + +func TestRevise_ChainOfThree_BackwardWalk(t *testing.T) { + s := newTestStore(t) + a, _ := s.Add(json.RawMessage(`{"v":1}`)) + b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`)) + c, _ := s.Revise(b.UID, json.RawMessage(`{"v":3}`)) + + chain, err := s.History(c.UID) + if err != nil { + t.Fatalf("History: %v", err) + } + want := []string{c.UID, b.UID, a.UID} + if len(chain) != 3 { + t.Fatalf("chain length = %d, want 3", len(chain)) + } + for i, tr := range chain { + if tr.UID != want[i] { + t.Errorf("chain[%d].UID = %q, want %q", i, tr.UID, want[i]) + } + } +} + +// ── Sprint 2 row 4: RETIRE marks trace excluded from retrieval ── + +func TestRetire_ExcludedFromSearch(t *testing.T) { + s := newTestStore(t) + a, _ := s.Add(json.RawMessage(`{"v":1}`), "common") + b, _ := s.Add(json.RawMessage(`{"v":2}`), "common") + if err := s.Retire(a.UID); err != nil { + t.Fatalf("Retire: %v", err) + } + + results := s.Search(SearchFilter{Tag: "common"}) + if len(results) != 1 || results[0].UID != b.UID { + t.Errorf("Search excluded retired? got %d results, want 1 (active only)", len(results)) + } + + // IncludeRetired flag returns both. + withRetired := s.Search(SearchFilter{Tag: "common", IncludeRetired: true}) + if len(withRetired) != 2 { + t.Errorf("IncludeRetired Search returned %d, want 2", len(withRetired)) + } +} + +func TestRetire_StillAccessibleViaGet(t *testing.T) { + // Per ADR-004: "Retired traces are excluded from Search by default + // but accessible via Get and History." Locks that contract. + s := newTestStore(t) + tr, _ := s.Add(json.RawMessage(`{"v":1}`)) + s.Retire(tr.UID) + + got, err := s.Get(tr.UID) + if err != nil { + t.Fatalf("retired trace Get: %v", err) + } + if !got.Retired { + t.Error("Get should preserve retired flag") + } +} + +func TestRetire_StillAccessibleViaHistory(t *testing.T) { + s := newTestStore(t) + a, _ := s.Add(json.RawMessage(`{"v":1}`)) + b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`)) + s.Retire(a.UID) + + chain, err := s.History(b.UID) + if err != nil { + t.Fatalf("History: %v", err) + } + if len(chain) != 2 { + t.Errorf("chain length = %d, want 2 (revision + retired root)", len(chain)) + } + if !chain[1].Retired { + t.Error("retired predecessor should still appear in History with Retired=true") + } +} + +// ── Sprint 2 row 5: HISTORY chain is cycle-safe ──────────────── + +func TestHistory_CycleDetected(t *testing.T) { + // Cycles can't form via the public API (new UIDs every Revise), + // but corruption could create one. Inject one directly into the + // internal map and verify History rejects it. + s := newTestStore(t) + s.traces["A"] = &Trace{UID: "A", PredecessorUID: "B"} + s.traces["B"] = &Trace{UID: "B", PredecessorUID: "A"} + + _, err := s.History("A") + if !errors.Is(err, ErrCycle) { + t.Errorf("expected ErrCycle, got %v", err) + } +} + +func TestHistory_PredecessorMissing_TruncatesChain(t *testing.T) { + s := newTestStore(t) + tr := &Trace{UID: "X", PredecessorUID: "ghost"} + s.traces["X"] = tr + + chain, err := s.History("X") + if err != nil { + t.Fatalf("History on partial chain: %v", err) + } + if len(chain) != 1 { + t.Errorf("partial chain returned %d, want 1 (truncate at missing predecessor)", len(chain)) + } +} + +func TestHistory_UnknownUID_ErrorsClean(t *testing.T) { + s := newTestStore(t) + _, err := s.History("nope") + if !errors.Is(err, ErrNotFound) { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +// ── Sprint 2 row 6: replay_count increments on duplicate ADD ─── + +func TestAddIdempotent_IncrementsReplayCount(t *testing.T) { + s := newTestStore(t) + + first, err := s.AddIdempotent("custom-uid", json.RawMessage(`{"v":1}`)) + if err != nil { + t.Fatalf("first AddIdempotent: %v", err) + } + if first.ReplayCount != 1 { + t.Errorf("first ReplayCount = %d, want 1", first.ReplayCount) + } + + second, err := s.AddIdempotent("custom-uid", json.RawMessage(`{"v":"different"}`)) + if err != nil { + t.Fatalf("second AddIdempotent: %v", err) + } + if second.ReplayCount != 2 { + t.Errorf("after second add, ReplayCount = %d, want 2", second.ReplayCount) + } + + // Original content preserved (replay does NOT overwrite). + if !strings.Contains(string(second.Content), "v") || + !strings.Contains(string(second.Content), "1") { + t.Errorf("replay should preserve original content, got %s", second.Content) + } +} + +func TestAddIdempotent_RejectsEmptyUID(t *testing.T) { + s := newTestStore(t) + _, err := s.AddIdempotent("", json.RawMessage(`{}`)) + if !errors.Is(err, ErrEmptyUID) { + t.Errorf("expected ErrEmptyUID, got %v", err) + } +} + +// ── Sprint 2 row 7: corrupted memory row recovery ───────────── + +func TestPersistor_RoundTrip(t *testing.T) { + s, path := newPersistedStore(t) + + a, _ := s.Add(json.RawMessage(`{"v":1}`), "alpha") + b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`), "alpha") + s.Retire(a.UID) + _ = b + + // Open fresh store against same file, replay. + p, _ := NewPersistor(path) + s2 := NewStore(p) + n, err := s2.Load() + if err != nil { + t.Fatalf("Load: %v", err) + } + if n != 3 { + t.Errorf("replayed %d events, want 3", n) + } + stats := s2.Stats() + if stats.Total != 2 { + t.Errorf("Stats.Total = %d, want 2", stats.Total) + } + if stats.Retired != 1 { + t.Errorf("Stats.Retired = %d, want 1", stats.Retired) + } + + got, _ := s2.Get(a.UID) + if !got.Retired { + t.Error("retired flag lost across persistence round-trip") + } +} + +// ── Search filter coverage ───────────────────────────────────── + +func TestSearch_TagFilter(t *testing.T) { + s := newTestStore(t) + s.Add(json.RawMessage(`{"v":1}`), "production") + s.Add(json.RawMessage(`{"v":2}`), "test") + s.Add(json.RawMessage(`{"v":3}`), "production", "edge") + + prodHits := s.Search(SearchFilter{Tag: "production"}) + if len(prodHits) != 2 { + t.Errorf("tag=production returned %d, want 2", len(prodHits)) + } + + edgeHits := s.Search(SearchFilter{Tag: "edge"}) + if len(edgeHits) != 1 { + t.Errorf("tag=edge returned %d, want 1", len(edgeHits)) + } +} + +func TestSearch_ContentContainsFilter(t *testing.T) { + s := newTestStore(t) + s.Add(json.RawMessage(`{"role":"welder","city":"Chicago"}`)) + s.Add(json.RawMessage(`{"role":"electrician","city":"Detroit"}`)) + s.Add(json.RawMessage(`{"role":"safety","city":"Chicago"}`)) + + chi := s.Search(SearchFilter{ContentContains: "Chicago"}) + if len(chi) != 2 { + t.Errorf("ContentContains=Chicago returned %d, want 2", len(chi)) + } +} + +func TestStats_TracksAllStates(t *testing.T) { + s := newTestStore(t) + a, _ := s.Add(json.RawMessage(`{}`)) + s.Add(json.RawMessage(`{}`)) + s.Add(json.RawMessage(`{}`)) + s.Retire(a.UID) + + st := s.Stats() + if st.Total != 3 { + t.Errorf("Total = %d, want 3", st.Total) + } + if st.Active != 2 { + t.Errorf("Active = %d, want 2", st.Active) + } + if st.Retired != 1 { + t.Errorf("Retired = %d, want 1", st.Retired) + } +} + +// ── Concurrency safety ──────────────────────────────────────── + +func TestStore_ConcurrentAdd(t *testing.T) { + s := newTestStore(t) + const N = 100 + done := make(chan bool, N) + for i := 0; i < N; i++ { + go func() { + _, err := s.Add(json.RawMessage(`{"x":1}`)) + if err != nil { + t.Errorf("concurrent Add: %v", err) + } + done <- true + }() + } + for i := 0; i < N; i++ { + <-done + } + if s.Stats().Total != N { + t.Errorf("after %d concurrent Adds, Total = %d", N, s.Stats().Total) + } +} diff --git a/internal/pathway/types.go b/internal/pathway/types.go new file mode 100644 index 0000000..64fe7c5 --- /dev/null +++ b/internal/pathway/types.go @@ -0,0 +1,89 @@ +// Package pathway implements Mem0-style versioned trace memory per +// ADR-004. Pathway memory is an append-only event log of opaque +// traces with Add / Update / Revise / Retire / History / Search +// operations. Persisted via JSONL (one event per line) with +// corruption recovery on load. +// +// Why this exists: agents need to remember what they tried and +// what worked. Mem0 is the lowest-common-denominator memory +// substrate; building on its surface means agent loops written +// against any Mem0-aware library work here. See feedback_meta_ +// index_vision.md for the north-star learning-loop framing. +package pathway + +import ( + "encoding/json" + "errors" +) + +// Trace is one entry in pathway memory. Content is opaque to the +// substrate — callers store whatever JSON shape they want; this +// layer just preserves and indexes it. +type Trace struct { + UID string `json:"uid"` + Content json.RawMessage `json:"content"` + PredecessorUID string `json:"predecessor_uid,omitempty"` + CreatedAtNs int64 `json:"created_at_ns"` + UpdatedAtNs int64 `json:"updated_at_ns"` + Retired bool `json:"retired"` + ReplayCount int `json:"replay_count"` + Tags []string `json:"tags,omitempty"` +} + +// op is the wire-format kind tag for JSONL persistence. Internal +// to the package — operations exposed publicly are method calls +// on Store; the JSONL form is its own concern. +type op string + +const ( + opAdd op = "add" + opUpdate op = "update" + opRevise op = "revise" + opRetire op = "retire" + opReplay op = "replay" +) + +// event is one line of the JSONL log. Trace is included for ops +// that introduce or replace a trace; UID alone suffices for retire +// and replay; Content alone suffices for update (reuses the +// existing trace's UID via the UID field). +type event struct { + Op op `json:"op"` + Trace *Trace `json:"trace,omitempty"` + UID string `json:"uid,omitempty"` + Content json.RawMessage `json:"content,omitempty"` +} + +// Errors surfaced to callers. Sentinel-based so HTTP handlers (when +// cmd/pathwayd lands) can map to status codes via errors.Is. +var ( + ErrNotFound = errors.New("pathway: trace not found") + ErrAlreadyExists = errors.New("pathway: trace already exists") + ErrPredecessorMissing = errors.New("pathway: predecessor trace missing") + ErrCycle = errors.New("pathway: history cycle detected") + ErrEmptyUID = errors.New("pathway: empty uid") + ErrInvalidContent = errors.New("pathway: invalid content") +) + +// SearchFilter narrows a Search to matching traces. Empty filter +// returns everything (excluding retired; flip IncludeRetired to +// override). All set fields are AND-combined. +type SearchFilter struct { + // Tag returns traces whose Tags slice contains this string. + Tag string + + // ContentContains returns traces whose Content contains this + // substring (treats Content as raw bytes; caller's contract + // for whether that's meaningful). + ContentContains string + + // CreatedAfterNs returns traces with CreatedAtNs >= this value. + CreatedAfterNs int64 + + // CreatedBeforeNs returns traces with CreatedAtNs <= this value. + // Zero = no upper bound. + CreatedBeforeNs int64 + + // IncludeRetired flips the default "exclude retired" behavior. + IncludeRetired bool +}