// store.go — the in-memory side of pathway memory. Persistence // (load/append-on-mutate) is in persistor.go; the Store can be // constructed without persistence for tests and ephemeral uses. package pathway import ( "bytes" "encoding/json" "errors" "sync" "time" "github.com/google/uuid" ) // Store is the in-memory pathway memory. Thread-safe via a single // RWMutex (read-heavy workloads are the norm; mutations are // individual operations not hot loops). type Store struct { mu sync.RWMutex // traces[uid] → *Trace. Single map covers both retired and // active traces; Search filters retired by default. traces map[string]*Trace // persistor is optional — nil = in-memory only (test mode // and ephemeral G2 uses). persistor *Persistor // nowFn returns "the current time in nanoseconds" — overridden // in tests for deterministic timestamps. nowFn func() int64 // uidFn generates new UIDs — overridden in tests for // deterministic UID sequences. uidFn func() string } // NewStore builds an empty Store. Pass nil persistor for in-memory // mode. The returned store is ready to receive operations; if // persistor is non-nil, call Load(ctx) before issuing operations to // rehydrate prior state. func NewStore(persistor *Persistor) *Store { return &Store{ traces: make(map[string]*Trace), persistor: persistor, nowFn: func() int64 { return time.Now().UnixNano() }, uidFn: func() string { return uuid.New().String() }, } } // Load replays the persistor's JSONL log and rebuilds in-memory // state. Safe to call multiple times — each call resets the in- // memory state to whatever the log says. Corruption (malformed // lines, broken events) is logged-not-fatal: the load proceeds // with the partial state it can recover. // // Returns the number of events successfully applied. func (s *Store) Load() (int, error) { if s.persistor == nil { return 0, nil } s.mu.Lock() defer s.mu.Unlock() s.traces = make(map[string]*Trace) // reset return s.persistor.Replay(func(e event) error { return s.applyEventLocked(e) }) } // applyEventLocked is the single point where events update the // in-memory map. Used by both Load (replaying log) and the // mutating methods (after appending to the log). Caller MUST hold // s.mu in write mode. func (s *Store) applyEventLocked(e event) error { switch e.Op { case opAdd, opRevise: if e.Trace == nil || e.Trace.UID == "" { return ErrInvalidContent } // Add semantics: if UID already exists, this should have been // a replay — but be permissive on Replay to handle older logs. s.traces[e.Trace.UID] = e.Trace return nil case opUpdate: t, ok := s.traces[e.UID] if !ok { return ErrNotFound } t.Content = e.Content t.UpdatedAtNs = s.nowFn() return nil case opRetire: t, ok := s.traces[e.UID] if !ok { return ErrNotFound } t.Retired = true t.UpdatedAtNs = s.nowFn() return nil case opReplay: t, ok := s.traces[e.UID] if !ok { return ErrNotFound } t.ReplayCount++ return nil default: return errors.New("pathway: unknown op") } } // Add stores a new trace with a fresh UID and replay_count=1. // Returns the stored trace (with UID + timestamps populated). func (s *Store) Add(content json.RawMessage, tags ...string) (*Trace, error) { if !json.Valid(content) { return nil, ErrInvalidContent } s.mu.Lock() defer s.mu.Unlock() now := s.nowFn() t := &Trace{ UID: s.uidFn(), Content: content, CreatedAtNs: now, UpdatedAtNs: now, ReplayCount: 1, Tags: copyTags(tags), } if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil { return nil, err } // Clone before returning so the caller can't mutate the in-memory // trace through the returned pointer (matches Get's contract). return cloneTrace(t), nil } // AddIdempotent stores a trace under the given UID, OR — if the // UID already exists — increments its ReplayCount. Used by agent // loops that want to record "I tried this same thing again." func (s *Store) AddIdempotent(uid string, content json.RawMessage, tags ...string) (*Trace, error) { if uid == "" { return nil, ErrEmptyUID } if !json.Valid(content) { return nil, ErrInvalidContent } s.mu.Lock() defer s.mu.Unlock() if existing, ok := s.traces[uid]; ok { // Replay: increment count, persist as opReplay event. if err := s.appendAndApplyLocked(event{Op: opReplay, UID: uid}); err != nil { return nil, err } // Return a copy to avoid the caller mutating the in-memory // trace through the returned pointer. return cloneTrace(existing), nil } now := s.nowFn() t := &Trace{ UID: uid, Content: content, CreatedAtNs: now, UpdatedAtNs: now, ReplayCount: 1, Tags: copyTags(tags), } if err := s.appendAndApplyLocked(event{Op: opAdd, Trace: t}); err != nil { return nil, err } return cloneTrace(t), nil } // Update replaces the content of an existing trace. Same UID, new // content. NOT a revision — use Revise when the new content // represents a change-of-belief that should preserve the old. func (s *Store) Update(uid string, content json.RawMessage) error { if uid == "" { return ErrEmptyUID } if !json.Valid(content) { return ErrInvalidContent } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.traces[uid]; !ok { return ErrNotFound } return s.appendAndApplyLocked(event{Op: opUpdate, UID: uid, Content: content}) } // Revise creates a new trace whose PredecessorUID points at an // existing trace. Old trace stays accessible via Get and History. // Returns the new trace. func (s *Store) Revise(predecessorUID string, content json.RawMessage, tags ...string) (*Trace, error) { if predecessorUID == "" { return nil, ErrEmptyUID } if !json.Valid(content) { return nil, ErrInvalidContent } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.traces[predecessorUID]; !ok { return nil, ErrPredecessorMissing } now := s.nowFn() t := &Trace{ UID: s.uidFn(), Content: content, PredecessorUID: predecessorUID, CreatedAtNs: now, UpdatedAtNs: now, ReplayCount: 1, Tags: copyTags(tags), } if err := s.appendAndApplyLocked(event{Op: opRevise, Trace: t}); err != nil { return nil, err } return cloneTrace(t), nil } // Retire marks a trace as retired. Retired traces are excluded // from Search by default but accessible via Get and History. func (s *Store) Retire(uid string) error { if uid == "" { return ErrEmptyUID } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.traces[uid]; !ok { return ErrNotFound } return s.appendAndApplyLocked(event{Op: opRetire, UID: uid}) } // Get returns a copy of the trace with the given UID. Includes // retired traces (caller decides what to do with them). func (s *Store) Get(uid string) (*Trace, error) { s.mu.RLock() defer s.mu.RUnlock() t, ok := s.traces[uid] if !ok { return nil, ErrNotFound } return cloneTrace(t), nil } // History returns the chain of traces from this UID backward // through PredecessorUID links. Slot 0 is the queried trace; slot // 1 is its predecessor; and so on. Cycle-safe: a UID that appears // twice during the walk returns ErrCycle (only happens if the // persistence file was hand-edited or there's a bug elsewhere). func (s *Store) History(uid string) ([]*Trace, error) { s.mu.RLock() defer s.mu.RUnlock() var chain []*Trace visited := make(map[string]struct{}) cursor := uid for cursor != "" { if _, seen := visited[cursor]; seen { return nil, ErrCycle } visited[cursor] = struct{}{} t, ok := s.traces[cursor] if !ok { if len(chain) == 0 { return nil, ErrNotFound } // Predecessor missing mid-chain — return what we have. break } chain = append(chain, cloneTrace(t)) cursor = t.PredecessorUID } return chain, nil } // Search returns traces matching the filter. Excludes retired by // default; pass IncludeRetired: true to include them. Returns a // new slice of trace copies — caller can mutate freely. func (s *Store) Search(filter SearchFilter) []*Trace { s.mu.RLock() defer s.mu.RUnlock() var out []*Trace for _, t := range s.traces { if t.Retired && !filter.IncludeRetired { continue } if filter.Tag != "" && !containsTag(t.Tags, filter.Tag) { continue } if filter.ContentContains != "" && !bytes.Contains(t.Content, []byte(filter.ContentContains)) { continue } if filter.CreatedAfterNs > 0 && t.CreatedAtNs < filter.CreatedAfterNs { continue } if filter.CreatedBeforeNs > 0 && t.CreatedAtNs > filter.CreatedBeforeNs { continue } out = append(out, cloneTrace(t)) } return out } // Stats returns lifetime counters useful for /stats endpoints and // operator dashboards. type Stats struct { Total int Active int Retired int } func (s *Store) Stats() Stats { s.mu.RLock() defer s.mu.RUnlock() st := Stats{Total: len(s.traces)} for _, t := range s.traces { if t.Retired { st.Retired++ } else { st.Active++ } } return st } // appendAndApplyLocked is the single-point write path: persist the // event first (so a crash mid-mutation doesn't leave in-memory // state ahead of the log), then apply it in memory. Caller holds // s.mu in write mode. func (s *Store) appendAndApplyLocked(e event) error { if s.persistor != nil { if err := s.persistor.Append(e); err != nil { return err } } return s.applyEventLocked(e) } // cloneTrace returns a deep copy so callers can't mutate the // in-memory trace through the returned pointer. func cloneTrace(t *Trace) *Trace { c := *t if t.Content != nil { c.Content = append(json.RawMessage(nil), t.Content...) } if t.Tags != nil { c.Tags = append([]string(nil), t.Tags...) } return &c } func copyTags(in []string) []string { if len(in) == 0 { return nil } out := make([]string, len(in)) copy(out, in) return out } func containsTag(tags []string, want string) bool { for _, t := range tags { if t == want { return true } } return false }