Closes Sprint 2 design-bar work (audit reports/scrum/sprint-backlog.md):
S2.1 — ADR-004 documents the pathway-memory data model
S2.2 — pathway port lands with deterministic fixture corpus
and full test coverage on day one
S2.3 — retired traces are excluded from retrieval (test
passes; would fail without the filter)
Mem0-style operations: Add / AddIdempotent / Update / Revise /
Retire / Get / History / Search. Each operation is a method on
Store; persistence is JSONL append-only with corruption recovery
on Replay.
internal/pathway/types.go Trace + event + SearchFilter + sentinel errors
internal/pathway/store.go in-memory state + RWMutex + ops
internal/pathway/persistor.go JSONL append-only log with replay
internal/pathway/store_test.go 20 test funcs covering all 7
Sprint 2 claim rows + concurrency
internal/pathway/persistor_test.go 6 test funcs covering missing-
file, corruption recovery, long-line
handling, parent-dir auto-create,
apply-error skip behavior
Sprint 2 claim coverage row-by-row:
ADD TestAdd_AssignsUIDAndTimestamps + TestAdd_RejectsInvalidJSON
UPDATE TestUpdate_ReplacesContentSameUID + Update_MissingUID_Errors
REVISE TestRevise_LinksToPredecessorViaHistory +
TestRevise_PredecessorMissing_Errors +
TestRevise_ChainOfThree_BackwardWalk
RETIRE TestRetire_ExcludedFromSearch +
TestRetire_StillAccessibleViaGet +
TestRetire_StillAccessibleViaHistory
HISTORY/cycle TestHistory_CycleDetected (injected via internal map),
TestHistory_PredecessorMissing_TruncatesChain,
TestHistory_UnknownUID_ErrorsClean
REPLAY/dup TestAddIdempotent_IncrementsReplayCount (locks the
"replay preserves original content" rule per ADR-004)
CORRUPTION TestPersistor_CorruptedLines_Skipped +
TestPersistor_ApplyError_Skipped
ROUND-TRIP TestPersistor_RoundTrip locks the full Save → fresh
Store → Load → Stats-match contract
Two real bugs caught during testing:
- Add returned the same *Trace stored in the map, so callers
holding a reference saw later mutations. Fixed: clone before
return (matches Get's contract). Same fix in AddIdempotent
+ Revise.
- Test typo: {"v":different} isn't valid JSON; AddIdempotent's
json.Valid rejected it as ErrInvalidContent. Test fixed to
use {"v":"different"}; the validation behavior is correct.
Skipped this commit (next):
- cmd/pathwayd HTTP binary
- gateway routing for /v1/pathway/*
- end-to-end smoke
These add the wire surface; the substrate ships first so the
wire layer can be a pure proxy in the next commit.
Verified:
go test -count=1 ./internal/pathway/ — 26 tests green
just verify — vet + test + 9 smokes 34s
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
382 lines
9.8 KiB
Go
382 lines
9.8 KiB
Go
// store.go — the in-memory side of pathway memory. Persistence
|
|
// (load/append-on-mutate) is in persistor.go; the Store can be
|
|
// constructed without persistence for tests and ephemeral uses.
|
|
|
|
package pathway
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Store is the in-memory pathway memory. Thread-safe via a single
|
|
// RWMutex (read-heavy workloads are the norm; mutations are
|
|
// individual operations not hot loops).
|
|
type Store struct {
|
|
mu sync.RWMutex
|
|
// traces[uid] → *Trace. Single map covers both retired and
|
|
// active traces; Search filters retired by default.
|
|
traces map[string]*Trace
|
|
|
|
// persistor is optional — nil = in-memory only (test mode
|
|
// and ephemeral G2 uses).
|
|
persistor *Persistor
|
|
|
|
// nowFn returns "the current time in nanoseconds" — overridden
|
|
// in tests for deterministic timestamps.
|
|
nowFn func() int64
|
|
|
|
// uidFn generates new UIDs — overridden in tests for
|
|
// deterministic UID sequences.
|
|
uidFn func() string
|
|
}
|
|
|
|
// NewStore builds an empty Store. Pass nil persistor for in-memory
|
|
// mode. The returned store is ready to receive operations; if
|
|
// persistor is non-nil, call Load(ctx) before issuing operations to
|
|
// rehydrate prior state.
|
|
func NewStore(persistor *Persistor) *Store {
|
|
return &Store{
|
|
traces: make(map[string]*Trace),
|
|
persistor: persistor,
|
|
nowFn: func() int64 { return time.Now().UnixNano() },
|
|
uidFn: func() string { return uuid.New().String() },
|
|
}
|
|
}
|
|
|
|
// Load replays the persistor's JSONL log and rebuilds in-memory
|
|
// state. Safe to call multiple times — each call resets the in-
|
|
// memory state to whatever the log says. Corruption (malformed
|
|
// lines, broken events) is logged-not-fatal: the load proceeds
|
|
// with the partial state it can recover.
|
|
//
|
|
// Returns the number of events successfully applied.
|
|
func (s *Store) Load() (int, error) {
|
|
if s.persistor == nil {
|
|
return 0, nil
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.traces = make(map[string]*Trace) // reset
|
|
return s.persistor.Replay(func(e event) error {
|
|
return s.applyEventLocked(e)
|
|
})
|
|
}
|
|
|
|
// applyEventLocked is the single point where events update the
|
|
// in-memory map. Used by both Load (replaying log) and the
|
|
// mutating methods (after appending to the log). Caller MUST hold
|
|
// s.mu in write mode.
|
|
func (s *Store) applyEventLocked(e event) error {
|
|
switch e.Op {
|
|
case opAdd, opRevise:
|
|
if e.Trace == nil || e.Trace.UID == "" {
|
|
return ErrInvalidContent
|
|
}
|
|
// Add semantics: if UID already exists, this should have been
|
|
// a replay — but be permissive on Replay to handle older logs.
|
|
s.traces[e.Trace.UID] = e.Trace
|
|
return nil
|
|
case opUpdate:
|
|
t, ok := s.traces[e.UID]
|
|
if !ok {
|
|
return ErrNotFound
|
|
}
|
|
t.Content = e.Content
|
|
t.UpdatedAtNs = s.nowFn()
|
|
return nil
|
|
case opRetire:
|
|
t, ok := s.traces[e.UID]
|
|
if !ok {
|
|
return ErrNotFound
|
|
}
|
|
t.Retired = true
|
|
t.UpdatedAtNs = s.nowFn()
|
|
return nil
|
|
case opReplay:
|
|
t, ok := s.traces[e.UID]
|
|
if !ok {
|
|
return ErrNotFound
|
|
}
|
|
t.ReplayCount++
|
|
return nil
|
|
default:
|
|
return errors.New("pathway: unknown op")
|
|
}
|
|
}
|
|
|
|
// Add stores a new trace with a fresh UID and replay_count=1.
|
|
// Returns the stored trace (with UID + timestamps populated).
|
|
func (s *Store) Add(content json.RawMessage, tags ...string) (*Trace, error) {
|
|
if !json.Valid(content) {
|
|
return nil, ErrInvalidContent
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
now := s.nowFn()
|
|
t := &Trace{
|
|
UID: s.uidFn(),
|
|
Content: content,
|
|
CreatedAtNs: now,
|
|
UpdatedAtNs: now,
|
|
ReplayCount: 1,
|
|
Tags: copyTags(tags),
|
|
}
|
|
if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil {
|
|
return nil, err
|
|
}
|
|
// Clone before returning so the caller can't mutate the in-memory
|
|
// trace through the returned pointer (matches Get's contract).
|
|
return cloneTrace(t), nil
|
|
}
|
|
|
|
// AddIdempotent stores a trace under the given UID, OR — if the
|
|
// UID already exists — increments its ReplayCount. Used by agent
|
|
// loops that want to record "I tried this same thing again."
|
|
func (s *Store) AddIdempotent(uid string, content json.RawMessage, tags ...string) (*Trace, error) {
|
|
if uid == "" {
|
|
return nil, ErrEmptyUID
|
|
}
|
|
if !json.Valid(content) {
|
|
return nil, ErrInvalidContent
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if existing, ok := s.traces[uid]; ok {
|
|
// Replay: increment count, persist as opReplay event.
|
|
if err := s.appendAndApplyLocked(event{Op: opReplay, UID: uid}); err != nil {
|
|
return nil, err
|
|
}
|
|
// Return a copy to avoid the caller mutating the in-memory
|
|
// trace through the returned pointer.
|
|
return cloneTrace(existing), nil
|
|
}
|
|
|
|
now := s.nowFn()
|
|
t := &Trace{
|
|
UID: uid,
|
|
Content: content,
|
|
CreatedAtNs: now,
|
|
UpdatedAtNs: now,
|
|
ReplayCount: 1,
|
|
Tags: copyTags(tags),
|
|
}
|
|
if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil {
|
|
return nil, err
|
|
}
|
|
return cloneTrace(t), nil
|
|
}
|
|
|
|
// Update replaces the content of an existing trace. Same UID, new
|
|
// content. NOT a revision — use Revise when the new content
|
|
// represents a change-of-belief that should preserve the old.
|
|
func (s *Store) Update(uid string, content json.RawMessage) error {
|
|
if uid == "" {
|
|
return ErrEmptyUID
|
|
}
|
|
if !json.Valid(content) {
|
|
return ErrInvalidContent
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, ok := s.traces[uid]; !ok {
|
|
return ErrNotFound
|
|
}
|
|
return s.appendAndApplyLocked(event{Op: opUpdate, UID: uid, Content: content})
|
|
}
|
|
|
|
// Revise creates a new trace whose PredecessorUID points at an
|
|
// existing trace. Old trace stays accessible via Get and History.
|
|
// Returns the new trace.
|
|
func (s *Store) Revise(predecessorUID string, content json.RawMessage, tags ...string) (*Trace, error) {
|
|
if predecessorUID == "" {
|
|
return nil, ErrEmptyUID
|
|
}
|
|
if !json.Valid(content) {
|
|
return nil, ErrInvalidContent
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, ok := s.traces[predecessorUID]; !ok {
|
|
return nil, ErrPredecessorMissing
|
|
}
|
|
now := s.nowFn()
|
|
t := &Trace{
|
|
UID: s.uidFn(),
|
|
Content: content,
|
|
PredecessorUID: predecessorUID,
|
|
CreatedAtNs: now,
|
|
UpdatedAtNs: now,
|
|
ReplayCount: 1,
|
|
Tags: copyTags(tags),
|
|
}
|
|
if err := s.appendAndApplyLocked(event{Op: opRevise, Trace: t}); err != nil {
|
|
return nil, err
|
|
}
|
|
return cloneTrace(t), nil
|
|
}
|
|
|
|
// Retire marks a trace as retired. Retired traces are excluded
|
|
// from Search by default but accessible via Get and History.
|
|
func (s *Store) Retire(uid string) error {
|
|
if uid == "" {
|
|
return ErrEmptyUID
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, ok := s.traces[uid]; !ok {
|
|
return ErrNotFound
|
|
}
|
|
return s.appendAndApplyLocked(event{Op: opRetire, UID: uid})
|
|
}
|
|
|
|
// Get returns a copy of the trace with the given UID. Includes
|
|
// retired traces (caller decides what to do with them).
|
|
func (s *Store) Get(uid string) (*Trace, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
t, ok := s.traces[uid]
|
|
if !ok {
|
|
return nil, ErrNotFound
|
|
}
|
|
return cloneTrace(t), nil
|
|
}
|
|
|
|
// History returns the chain of traces from this UID backward
|
|
// through PredecessorUID links. Slot 0 is the queried trace; slot
|
|
// 1 is its predecessor; and so on. Cycle-safe: a UID that appears
|
|
// twice during the walk returns ErrCycle (only happens if the
|
|
// persistence file was hand-edited or there's a bug elsewhere).
|
|
func (s *Store) History(uid string) ([]*Trace, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var chain []*Trace
|
|
visited := make(map[string]struct{})
|
|
cursor := uid
|
|
for cursor != "" {
|
|
if _, seen := visited[cursor]; seen {
|
|
return nil, ErrCycle
|
|
}
|
|
visited[cursor] = struct{}{}
|
|
|
|
t, ok := s.traces[cursor]
|
|
if !ok {
|
|
if len(chain) == 0 {
|
|
return nil, ErrNotFound
|
|
}
|
|
// Predecessor missing mid-chain — return what we have.
|
|
break
|
|
}
|
|
chain = append(chain, cloneTrace(t))
|
|
cursor = t.PredecessorUID
|
|
}
|
|
return chain, nil
|
|
}
|
|
|
|
// Search returns traces matching the filter. Excludes retired by
|
|
// default; pass IncludeRetired: true to include them. Returns a
|
|
// new slice of trace copies — caller can mutate freely.
|
|
func (s *Store) Search(filter SearchFilter) []*Trace {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var out []*Trace
|
|
for _, t := range s.traces {
|
|
if t.Retired && !filter.IncludeRetired {
|
|
continue
|
|
}
|
|
if filter.Tag != "" && !containsTag(t.Tags, filter.Tag) {
|
|
continue
|
|
}
|
|
if filter.ContentContains != "" &&
|
|
!bytes.Contains(t.Content, []byte(filter.ContentContains)) {
|
|
continue
|
|
}
|
|
if filter.CreatedAfterNs > 0 && t.CreatedAtNs < filter.CreatedAfterNs {
|
|
continue
|
|
}
|
|
if filter.CreatedBeforeNs > 0 && t.CreatedAtNs > filter.CreatedBeforeNs {
|
|
continue
|
|
}
|
|
out = append(out, cloneTrace(t))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Stats returns lifetime counters useful for /stats endpoints and
|
|
// operator dashboards.
|
|
type Stats struct {
|
|
Total int
|
|
Active int
|
|
Retired int
|
|
}
|
|
|
|
func (s *Store) Stats() Stats {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
st := Stats{Total: len(s.traces)}
|
|
for _, t := range s.traces {
|
|
if t.Retired {
|
|
st.Retired++
|
|
} else {
|
|
st.Active++
|
|
}
|
|
}
|
|
return st
|
|
}
|
|
|
|
// appendAndApplyLocked is the single-point write path: persist the
|
|
// event first (so a crash mid-mutation doesn't leave in-memory
|
|
// state ahead of the log), then apply it in memory. Caller holds
|
|
// s.mu in write mode.
|
|
func (s *Store) appendAndApplyLocked(e event) error {
|
|
if s.persistor != nil {
|
|
if err := s.persistor.Append(e); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return s.applyEventLocked(e)
|
|
}
|
|
|
|
// cloneTrace returns a deep copy so callers can't mutate the
|
|
// in-memory trace through the returned pointer.
|
|
func cloneTrace(t *Trace) *Trace {
|
|
c := *t
|
|
if t.Content != nil {
|
|
c.Content = append(json.RawMessage(nil), t.Content...)
|
|
}
|
|
if t.Tags != nil {
|
|
c.Tags = append([]string(nil), t.Tags...)
|
|
}
|
|
return &c
|
|
}
|
|
|
|
func copyTags(in []string) []string {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]string, len(in))
|
|
copy(out, in)
|
|
return out
|
|
}
|
|
|
|
func containsTag(tags []string, want string) bool {
|
|
for _, t := range tags {
|
|
if t == want {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|