root 2a6234ff82 ADR-004 + internal/pathway: Mem0 versioned trace substrate
Closes Sprint 2 design-bar work (audit reports/scrum/sprint-backlog.md):
  S2.1 — ADR-004 documents the pathway-memory data model
  S2.2 — pathway port lands with deterministic fixture corpus
         and full test coverage on day one
  S2.3 — retired traces are excluded from retrieval (test
         passes; would fail without the filter)

Mem0-style operations: Add / AddIdempotent / Update / Revise /
Retire / Get / History / Search. Each operation is a method on
Store; persistence is JSONL append-only with corruption recovery
on Replay.

internal/pathway/types.go     Trace + event + SearchFilter + sentinel errors
internal/pathway/store.go     in-memory state + RWMutex + ops
internal/pathway/persistor.go JSONL append-only log with replay
internal/pathway/store_test.go  20 test funcs covering all 7
                                Sprint 2 claim rows + concurrency
internal/pathway/persistor_test.go  6 test funcs covering missing-
                                file, corruption recovery, long-line
                                handling, parent-dir auto-create,
                                apply-error skip behavior

Sprint 2 claim coverage row-by-row:
  ADD          TestAdd_AssignsUIDAndTimestamps + TestAdd_RejectsInvalidJSON
  UPDATE       TestUpdate_ReplacesContentSameUID + Update_MissingUID_Errors
  REVISE       TestRevise_LinksToPredecessorViaHistory +
               TestRevise_PredecessorMissing_Errors +
               TestRevise_ChainOfThree_BackwardWalk
  RETIRE       TestRetire_ExcludedFromSearch +
               TestRetire_StillAccessibleViaGet +
               TestRetire_StillAccessibleViaHistory
  HISTORY/cycle TestHistory_CycleDetected (injected via internal map),
                TestHistory_PredecessorMissing_TruncatesChain,
                TestHistory_UnknownUID_ErrorsClean
  REPLAY/dup   TestAddIdempotent_IncrementsReplayCount (locks the
               "replay preserves original content" rule per ADR-004)
  CORRUPTION   TestPersistor_CorruptedLines_Skipped +
               TestPersistor_ApplyError_Skipped
  ROUND-TRIP   TestPersistor_RoundTrip locks the full Save → fresh
               Store → Load → Stats-match contract

Two real bugs caught during testing:
  - Add returned the same *Trace stored in the map, so callers
    holding a reference saw later mutations. Fixed: clone before
    return (matches Get's contract). Same fix in AddIdempotent
    + Revise.
  - Test typo: {"v":different} isn't valid JSON; AddIdempotent's
    json.Valid rejected it as ErrInvalidContent. Test fixed to
    use {"v":"different"}; the validation behavior is correct.

Skipped this commit (next):
  - cmd/pathwayd HTTP binary
  - gateway routing for /v1/pathway/*
  - end-to-end smoke
  These add the wire surface; the substrate ships first so the
  wire layer can be a pure proxy in the next commit.

Verified:
  go test -count=1 ./internal/pathway/ — 26 tests green
  just verify                          — vet + test + 9 smokes 34s

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 07:23:30 -05:00

382 lines
9.8 KiB
Go

// store.go — the in-memory side of pathway memory. Persistence
// (load/append-on-mutate) is in persistor.go; the Store can be
// constructed without persistence for tests and ephemeral uses.
package pathway
import (
"bytes"
"encoding/json"
"errors"
"sync"
"time"
"github.com/google/uuid"
)
// Store is the in-memory pathway memory. Thread-safe via a single
// RWMutex (read-heavy workloads are the norm; mutations are
// individual operations not hot loops).
type Store struct {
mu sync.RWMutex
// traces[uid] → *Trace. Single map covers both retired and
// active traces; Search filters retired by default.
traces map[string]*Trace
// persistor is optional — nil = in-memory only (test mode
// and ephemeral G2 uses).
persistor *Persistor
// nowFn returns "the current time in nanoseconds" — overridden
// in tests for deterministic timestamps.
nowFn func() int64
// uidFn generates new UIDs — overridden in tests for
// deterministic UID sequences.
uidFn func() string
}
// NewStore builds an empty Store. Pass nil persistor for in-memory
// mode. The returned store is ready to receive operations; if
// persistor is non-nil, call Load(ctx) before issuing operations to
// rehydrate prior state.
func NewStore(persistor *Persistor) *Store {
return &Store{
traces: make(map[string]*Trace),
persistor: persistor,
nowFn: func() int64 { return time.Now().UnixNano() },
uidFn: func() string { return uuid.New().String() },
}
}
// Load replays the persistor's JSONL log and rebuilds in-memory
// state. Safe to call multiple times — each call resets the in-
// memory state to whatever the log says. Corruption (malformed
// lines, broken events) is logged-not-fatal: the load proceeds
// with the partial state it can recover.
//
// Returns the number of events successfully applied.
func (s *Store) Load() (int, error) {
if s.persistor == nil {
return 0, nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.traces = make(map[string]*Trace) // reset
return s.persistor.Replay(func(e event) error {
return s.applyEventLocked(e)
})
}
// applyEventLocked is the single point where events update the
// in-memory map. Used by both Load (replaying log) and the
// mutating methods (after appending to the log). Caller MUST hold
// s.mu in write mode.
func (s *Store) applyEventLocked(e event) error {
switch e.Op {
case opAdd, opRevise:
if e.Trace == nil || e.Trace.UID == "" {
return ErrInvalidContent
}
// Add semantics: if UID already exists, this should have been
// a replay — but be permissive on Replay to handle older logs.
s.traces[e.Trace.UID] = e.Trace
return nil
case opUpdate:
t, ok := s.traces[e.UID]
if !ok {
return ErrNotFound
}
t.Content = e.Content
t.UpdatedAtNs = s.nowFn()
return nil
case opRetire:
t, ok := s.traces[e.UID]
if !ok {
return ErrNotFound
}
t.Retired = true
t.UpdatedAtNs = s.nowFn()
return nil
case opReplay:
t, ok := s.traces[e.UID]
if !ok {
return ErrNotFound
}
t.ReplayCount++
return nil
default:
return errors.New("pathway: unknown op")
}
}
// Add stores a new trace with a fresh UID and replay_count=1.
// Returns the stored trace (with UID + timestamps populated).
func (s *Store) Add(content json.RawMessage, tags ...string) (*Trace, error) {
if !json.Valid(content) {
return nil, ErrInvalidContent
}
s.mu.Lock()
defer s.mu.Unlock()
now := s.nowFn()
t := &Trace{
UID: s.uidFn(),
Content: content,
CreatedAtNs: now,
UpdatedAtNs: now,
ReplayCount: 1,
Tags: copyTags(tags),
}
if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil {
return nil, err
}
// Clone before returning so the caller can't mutate the in-memory
// trace through the returned pointer (matches Get's contract).
return cloneTrace(t), nil
}
// AddIdempotent stores a trace under the given UID, OR — if the
// UID already exists — increments its ReplayCount. Used by agent
// loops that want to record "I tried this same thing again."
func (s *Store) AddIdempotent(uid string, content json.RawMessage, tags ...string) (*Trace, error) {
if uid == "" {
return nil, ErrEmptyUID
}
if !json.Valid(content) {
return nil, ErrInvalidContent
}
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.traces[uid]; ok {
// Replay: increment count, persist as opReplay event.
if err := s.appendAndApplyLocked(event{Op: opReplay, UID: uid}); err != nil {
return nil, err
}
// Return a copy to avoid the caller mutating the in-memory
// trace through the returned pointer.
return cloneTrace(existing), nil
}
now := s.nowFn()
t := &Trace{
UID: uid,
Content: content,
CreatedAtNs: now,
UpdatedAtNs: now,
ReplayCount: 1,
Tags: copyTags(tags),
}
if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil {
return nil, err
}
return cloneTrace(t), nil
}
// Update replaces the content of an existing trace. Same UID, new
// content. NOT a revision — use Revise when the new content
// represents a change-of-belief that should preserve the old.
func (s *Store) Update(uid string, content json.RawMessage) error {
if uid == "" {
return ErrEmptyUID
}
if !json.Valid(content) {
return ErrInvalidContent
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.traces[uid]; !ok {
return ErrNotFound
}
return s.appendAndApplyLocked(event{Op: opUpdate, UID: uid, Content: content})
}
// Revise creates a new trace whose PredecessorUID points at an
// existing trace. Old trace stays accessible via Get and History.
// Returns the new trace.
func (s *Store) Revise(predecessorUID string, content json.RawMessage, tags ...string) (*Trace, error) {
if predecessorUID == "" {
return nil, ErrEmptyUID
}
if !json.Valid(content) {
return nil, ErrInvalidContent
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.traces[predecessorUID]; !ok {
return nil, ErrPredecessorMissing
}
now := s.nowFn()
t := &Trace{
UID: s.uidFn(),
Content: content,
PredecessorUID: predecessorUID,
CreatedAtNs: now,
UpdatedAtNs: now,
ReplayCount: 1,
Tags: copyTags(tags),
}
if err := s.appendAndApplyLocked(event{Op: opRevise, Trace: t}); err != nil {
return nil, err
}
return cloneTrace(t), nil
}
// Retire marks a trace as retired. Retired traces are excluded
// from Search by default but accessible via Get and History.
func (s *Store) Retire(uid string) error {
if uid == "" {
return ErrEmptyUID
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.traces[uid]; !ok {
return ErrNotFound
}
return s.appendAndApplyLocked(event{Op: opRetire, UID: uid})
}
// Get returns a copy of the trace with the given UID. Includes
// retired traces (caller decides what to do with them).
func (s *Store) Get(uid string) (*Trace, error) {
s.mu.RLock()
defer s.mu.RUnlock()
t, ok := s.traces[uid]
if !ok {
return nil, ErrNotFound
}
return cloneTrace(t), nil
}
// History returns the chain of traces from this UID backward
// through PredecessorUID links. Slot 0 is the queried trace; slot
// 1 is its predecessor; and so on. Cycle-safe: a UID that appears
// twice during the walk returns ErrCycle (only happens if the
// persistence file was hand-edited or there's a bug elsewhere).
func (s *Store) History(uid string) ([]*Trace, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var chain []*Trace
visited := make(map[string]struct{})
cursor := uid
for cursor != "" {
if _, seen := visited[cursor]; seen {
return nil, ErrCycle
}
visited[cursor] = struct{}{}
t, ok := s.traces[cursor]
if !ok {
if len(chain) == 0 {
return nil, ErrNotFound
}
// Predecessor missing mid-chain — return what we have.
break
}
chain = append(chain, cloneTrace(t))
cursor = t.PredecessorUID
}
return chain, nil
}
// Search returns traces matching the filter. Excludes retired by
// default; pass IncludeRetired: true to include them. Returns a
// new slice of trace copies — caller can mutate freely.
func (s *Store) Search(filter SearchFilter) []*Trace {
s.mu.RLock()
defer s.mu.RUnlock()
var out []*Trace
for _, t := range s.traces {
if t.Retired && !filter.IncludeRetired {
continue
}
if filter.Tag != "" && !containsTag(t.Tags, filter.Tag) {
continue
}
if filter.ContentContains != "" &&
!bytes.Contains(t.Content, []byte(filter.ContentContains)) {
continue
}
if filter.CreatedAfterNs > 0 && t.CreatedAtNs < filter.CreatedAfterNs {
continue
}
if filter.CreatedBeforeNs > 0 && t.CreatedAtNs > filter.CreatedBeforeNs {
continue
}
out = append(out, cloneTrace(t))
}
return out
}
// Stats returns lifetime counters useful for /stats endpoints and
// operator dashboards.
type Stats struct {
Total int
Active int
Retired int
}
func (s *Store) Stats() Stats {
s.mu.RLock()
defer s.mu.RUnlock()
st := Stats{Total: len(s.traces)}
for _, t := range s.traces {
if t.Retired {
st.Retired++
} else {
st.Active++
}
}
return st
}
// appendAndApplyLocked is the single-point write path: persist the
// event first (so a crash mid-mutation doesn't leave in-memory
// state ahead of the log), then apply it in memory. Caller holds
// s.mu in write mode.
func (s *Store) appendAndApplyLocked(e event) error {
if s.persistor != nil {
if err := s.persistor.Append(e); err != nil {
return err
}
}
return s.applyEventLocked(e)
}
// cloneTrace returns a deep copy so callers can't mutate the
// in-memory trace through the returned pointer.
func cloneTrace(t *Trace) *Trace {
c := *t
if t.Content != nil {
c.Content = append(json.RawMessage(nil), t.Content...)
}
if t.Tags != nil {
c.Tags = append([]string(nil), t.Tags...)
}
return &c
}
func copyTags(in []string) []string {
if len(in) == 0 {
return nil
}
out := make([]string, len(in))
copy(out, in)
return out
}
func containsTag(tags []string, want string) bool {
for _, t := range tags {
if t == want {
return true
}
}
return false
}