Closes Sprint 2 design-bar work (audit reports/scrum/sprint-backlog.md):
S2.1 — ADR-004 documents the pathway-memory data model
S2.2 — pathway port lands with deterministic fixture corpus
and full test coverage on day one
S2.3 — retired traces are excluded from retrieval (test
passes; would fail without the filter)
Mem0-style operations: Add / AddIdempotent / Update / Revise /
Retire / Get / History / Search. Each operation is a method on
Store; persistence is JSONL append-only with corruption recovery
on Replay.
internal/pathway/types.go Trace + event + SearchFilter + sentinel errors
internal/pathway/store.go in-memory state + RWMutex + ops
internal/pathway/persistor.go JSONL append-only log with replay
internal/pathway/store_test.go 20 test funcs covering all 7
Sprint 2 claim rows + concurrency
internal/pathway/persistor_test.go 6 test funcs covering missing-
file, corruption recovery, long-line
handling, parent-dir auto-create,
apply-error skip behavior
Sprint 2 claim coverage row-by-row:
ADD TestAdd_AssignsUIDAndTimestamps + TestAdd_RejectsInvalidJSON
UPDATE TestUpdate_ReplacesContentSameUID + Update_MissingUID_Errors
REVISE TestRevise_LinksToPredecessorViaHistory +
TestRevise_PredecessorMissing_Errors +
TestRevise_ChainOfThree_BackwardWalk
RETIRE TestRetire_ExcludedFromSearch +
TestRetire_StillAccessibleViaGet +
TestRetire_StillAccessibleViaHistory
HISTORY/cycle TestHistory_CycleDetected (injected via internal map),
TestHistory_PredecessorMissing_TruncatesChain,
TestHistory_UnknownUID_ErrorsClean
REPLAY/dup TestAddIdempotent_IncrementsReplayCount (locks the
"replay preserves original content" rule per ADR-004)
CORRUPTION TestPersistor_CorruptedLines_Skipped +
TestPersistor_ApplyError_Skipped
ROUND-TRIP TestPersistor_RoundTrip locks the full Save → fresh
Store → Load → Stats-match contract
Two real bugs caught during testing:
- Add returned the same *Trace stored in the map, so callers
holding a reference saw later mutations. Fixed: clone before
return (matches Get's contract). Same fix in AddIdempotent
+ Revise.
- Test typo: {"v":different} isn't valid JSON; AddIdempotent's
json.Valid rejected it as ErrInvalidContent. Test fixed to
use {"v":"different"}; the validation behavior is correct.
Skipped this commit (next):
- cmd/pathwayd HTTP binary
- gateway routing for /v1/pathway/*
- end-to-end smoke
These add the wire surface; the substrate ships first so the
wire layer can be a pure proxy in the next commit.
Verified:
go test -count=1 ./internal/pathway/ — 26 tests green
just verify — vet + test + 9 smokes 34s
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
399 lines
11 KiB
Go
399 lines
11 KiB
Go
package pathway
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
// Closes Sprint 2 design-bar work from the audit. Tests cover all 7
|
|
// claim rows from claim-coverage-table.md: ADD, UPDATE, REVISE,
|
|
// RETIRE, HISTORY chain cycle-safe, replay-count duplicate ADD,
|
|
// corrupted memory row recovery (corrupted_test.go).
|
|
|
|
// newTestStore returns an in-memory Store with deterministic UID +
|
|
// time generation for repeatable assertions.
|
|
func newTestStore(t *testing.T) *Store {
|
|
t.Helper()
|
|
s := NewStore(nil)
|
|
var counter int
|
|
var clock int64
|
|
s.uidFn = func() string {
|
|
counter++
|
|
return "uid-" + strconv.Itoa(counter)
|
|
}
|
|
s.nowFn = func() int64 {
|
|
clock++
|
|
return clock
|
|
}
|
|
return s
|
|
}
|
|
|
|
func newPersistedStore(t *testing.T) (*Store, string) {
|
|
t.Helper()
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "pathway.jsonl")
|
|
p, err := NewPersistor(path)
|
|
if err != nil {
|
|
t.Fatalf("NewPersistor: %v", err)
|
|
}
|
|
s := NewStore(p)
|
|
var counter int
|
|
var clock int64
|
|
s.uidFn = func() string {
|
|
counter++
|
|
return "uid-" + strconv.Itoa(counter)
|
|
}
|
|
s.nowFn = func() int64 {
|
|
clock++
|
|
return clock
|
|
}
|
|
return s, path
|
|
}
|
|
|
|
// ── Sprint 2 row 1: ADD a new pathway trace ────────────────────
|
|
|
|
func TestAdd_AssignsUIDAndTimestamps(t *testing.T) {
|
|
s := newTestStore(t)
|
|
tr, err := s.Add(json.RawMessage(`{"k":"v"}`), "tag-a")
|
|
if err != nil {
|
|
t.Fatalf("Add: %v", err)
|
|
}
|
|
if tr.UID != "uid-1" {
|
|
t.Errorf("UID = %q, want uid-1", tr.UID)
|
|
}
|
|
if tr.ReplayCount != 1 {
|
|
t.Errorf("ReplayCount = %d, want 1", tr.ReplayCount)
|
|
}
|
|
if tr.Retired {
|
|
t.Error("freshly-added trace should NOT be retired")
|
|
}
|
|
if tr.CreatedAtNs == 0 || tr.UpdatedAtNs == 0 {
|
|
t.Error("timestamps unset")
|
|
}
|
|
if len(tr.Tags) != 1 || tr.Tags[0] != "tag-a" {
|
|
t.Errorf("Tags = %v, want [tag-a]", tr.Tags)
|
|
}
|
|
}
|
|
|
|
func TestAdd_RejectsInvalidJSON(t *testing.T) {
|
|
s := newTestStore(t)
|
|
_, err := s.Add(json.RawMessage(`not json`))
|
|
if !errors.Is(err, ErrInvalidContent) {
|
|
t.Errorf("expected ErrInvalidContent, got %v", err)
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 2: UPDATE replaces existing trace by uid ──────
|
|
|
|
func TestUpdate_ReplacesContentSameUID(t *testing.T) {
|
|
s := newTestStore(t)
|
|
tr, _ := s.Add(json.RawMessage(`{"v":1}`))
|
|
|
|
if err := s.Update(tr.UID, json.RawMessage(`{"v":2}`)); err != nil {
|
|
t.Fatalf("Update: %v", err)
|
|
}
|
|
|
|
got, _ := s.Get(tr.UID)
|
|
if string(got.Content) != `{"v":2}` {
|
|
t.Errorf("content = %s, want updated", got.Content)
|
|
}
|
|
if got.UpdatedAtNs == tr.UpdatedAtNs {
|
|
t.Error("UpdatedAtNs should bump on Update")
|
|
}
|
|
}
|
|
|
|
func TestUpdate_MissingUID_Errors(t *testing.T) {
|
|
s := newTestStore(t)
|
|
err := s.Update("nonexistent", json.RawMessage(`{}`))
|
|
if !errors.Is(err, ErrNotFound) {
|
|
t.Errorf("expected ErrNotFound, got %v", err)
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 3: REVISE creates a new revision linked via history ──
|
|
|
|
func TestRevise_LinksToPredecessorViaHistory(t *testing.T) {
|
|
s := newTestStore(t)
|
|
root, _ := s.Add(json.RawMessage(`{"v":1}`))
|
|
rev, err := s.Revise(root.UID, json.RawMessage(`{"v":2}`))
|
|
if err != nil {
|
|
t.Fatalf("Revise: %v", err)
|
|
}
|
|
if rev.PredecessorUID != root.UID {
|
|
t.Errorf("PredecessorUID = %q, want %q", rev.PredecessorUID, root.UID)
|
|
}
|
|
if rev.UID == root.UID {
|
|
t.Error("Revise must produce a NEW UID")
|
|
}
|
|
}
|
|
|
|
func TestRevise_PredecessorMissing_Errors(t *testing.T) {
|
|
s := newTestStore(t)
|
|
_, err := s.Revise("ghost-uid", json.RawMessage(`{}`))
|
|
if !errors.Is(err, ErrPredecessorMissing) {
|
|
t.Errorf("expected ErrPredecessorMissing, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRevise_ChainOfThree_BackwardWalk(t *testing.T) {
|
|
s := newTestStore(t)
|
|
a, _ := s.Add(json.RawMessage(`{"v":1}`))
|
|
b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`))
|
|
c, _ := s.Revise(b.UID, json.RawMessage(`{"v":3}`))
|
|
|
|
chain, err := s.History(c.UID)
|
|
if err != nil {
|
|
t.Fatalf("History: %v", err)
|
|
}
|
|
want := []string{c.UID, b.UID, a.UID}
|
|
if len(chain) != 3 {
|
|
t.Fatalf("chain length = %d, want 3", len(chain))
|
|
}
|
|
for i, tr := range chain {
|
|
if tr.UID != want[i] {
|
|
t.Errorf("chain[%d].UID = %q, want %q", i, tr.UID, want[i])
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 4: RETIRE marks trace excluded from retrieval ──
|
|
|
|
func TestRetire_ExcludedFromSearch(t *testing.T) {
|
|
s := newTestStore(t)
|
|
a, _ := s.Add(json.RawMessage(`{"v":1}`), "common")
|
|
b, _ := s.Add(json.RawMessage(`{"v":2}`), "common")
|
|
if err := s.Retire(a.UID); err != nil {
|
|
t.Fatalf("Retire: %v", err)
|
|
}
|
|
|
|
results := s.Search(SearchFilter{Tag: "common"})
|
|
if len(results) != 1 || results[0].UID != b.UID {
|
|
t.Errorf("Search excluded retired? got %d results, want 1 (active only)", len(results))
|
|
}
|
|
|
|
// IncludeRetired flag returns both.
|
|
withRetired := s.Search(SearchFilter{Tag: "common", IncludeRetired: true})
|
|
if len(withRetired) != 2 {
|
|
t.Errorf("IncludeRetired Search returned %d, want 2", len(withRetired))
|
|
}
|
|
}
|
|
|
|
func TestRetire_StillAccessibleViaGet(t *testing.T) {
|
|
// Per ADR-004: "Retired traces are excluded from Search by default
|
|
// but accessible via Get and History." Locks that contract.
|
|
s := newTestStore(t)
|
|
tr, _ := s.Add(json.RawMessage(`{"v":1}`))
|
|
s.Retire(tr.UID)
|
|
|
|
got, err := s.Get(tr.UID)
|
|
if err != nil {
|
|
t.Fatalf("retired trace Get: %v", err)
|
|
}
|
|
if !got.Retired {
|
|
t.Error("Get should preserve retired flag")
|
|
}
|
|
}
|
|
|
|
func TestRetire_StillAccessibleViaHistory(t *testing.T) {
|
|
s := newTestStore(t)
|
|
a, _ := s.Add(json.RawMessage(`{"v":1}`))
|
|
b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`))
|
|
s.Retire(a.UID)
|
|
|
|
chain, err := s.History(b.UID)
|
|
if err != nil {
|
|
t.Fatalf("History: %v", err)
|
|
}
|
|
if len(chain) != 2 {
|
|
t.Errorf("chain length = %d, want 2 (revision + retired root)", len(chain))
|
|
}
|
|
if !chain[1].Retired {
|
|
t.Error("retired predecessor should still appear in History with Retired=true")
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 5: HISTORY chain is cycle-safe ────────────────
|
|
|
|
func TestHistory_CycleDetected(t *testing.T) {
|
|
// Cycles can't form via the public API (new UIDs every Revise),
|
|
// but corruption could create one. Inject one directly into the
|
|
// internal map and verify History rejects it.
|
|
s := newTestStore(t)
|
|
s.traces["A"] = &Trace{UID: "A", PredecessorUID: "B"}
|
|
s.traces["B"] = &Trace{UID: "B", PredecessorUID: "A"}
|
|
|
|
_, err := s.History("A")
|
|
if !errors.Is(err, ErrCycle) {
|
|
t.Errorf("expected ErrCycle, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestHistory_PredecessorMissing_TruncatesChain(t *testing.T) {
|
|
s := newTestStore(t)
|
|
tr := &Trace{UID: "X", PredecessorUID: "ghost"}
|
|
s.traces["X"] = tr
|
|
|
|
chain, err := s.History("X")
|
|
if err != nil {
|
|
t.Fatalf("History on partial chain: %v", err)
|
|
}
|
|
if len(chain) != 1 {
|
|
t.Errorf("partial chain returned %d, want 1 (truncate at missing predecessor)", len(chain))
|
|
}
|
|
}
|
|
|
|
func TestHistory_UnknownUID_ErrorsClean(t *testing.T) {
|
|
s := newTestStore(t)
|
|
_, err := s.History("nope")
|
|
if !errors.Is(err, ErrNotFound) {
|
|
t.Errorf("expected ErrNotFound, got %v", err)
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 6: replay_count increments on duplicate ADD ───
|
|
|
|
func TestAddIdempotent_IncrementsReplayCount(t *testing.T) {
|
|
s := newTestStore(t)
|
|
|
|
first, err := s.AddIdempotent("custom-uid", json.RawMessage(`{"v":1}`))
|
|
if err != nil {
|
|
t.Fatalf("first AddIdempotent: %v", err)
|
|
}
|
|
if first.ReplayCount != 1 {
|
|
t.Errorf("first ReplayCount = %d, want 1", first.ReplayCount)
|
|
}
|
|
|
|
second, err := s.AddIdempotent("custom-uid", json.RawMessage(`{"v":"different"}`))
|
|
if err != nil {
|
|
t.Fatalf("second AddIdempotent: %v", err)
|
|
}
|
|
if second.ReplayCount != 2 {
|
|
t.Errorf("after second add, ReplayCount = %d, want 2", second.ReplayCount)
|
|
}
|
|
|
|
// Original content preserved (replay does NOT overwrite).
|
|
if !strings.Contains(string(second.Content), "v") ||
|
|
!strings.Contains(string(second.Content), "1") {
|
|
t.Errorf("replay should preserve original content, got %s", second.Content)
|
|
}
|
|
}
|
|
|
|
func TestAddIdempotent_RejectsEmptyUID(t *testing.T) {
|
|
s := newTestStore(t)
|
|
_, err := s.AddIdempotent("", json.RawMessage(`{}`))
|
|
if !errors.Is(err, ErrEmptyUID) {
|
|
t.Errorf("expected ErrEmptyUID, got %v", err)
|
|
}
|
|
}
|
|
|
|
// ── Sprint 2 row 7: corrupted memory row recovery ─────────────
|
|
|
|
func TestPersistor_RoundTrip(t *testing.T) {
|
|
s, path := newPersistedStore(t)
|
|
|
|
a, _ := s.Add(json.RawMessage(`{"v":1}`), "alpha")
|
|
b, _ := s.Revise(a.UID, json.RawMessage(`{"v":2}`), "alpha")
|
|
s.Retire(a.UID)
|
|
_ = b
|
|
|
|
// Open fresh store against same file, replay.
|
|
p, _ := NewPersistor(path)
|
|
s2 := NewStore(p)
|
|
n, err := s2.Load()
|
|
if err != nil {
|
|
t.Fatalf("Load: %v", err)
|
|
}
|
|
if n != 3 {
|
|
t.Errorf("replayed %d events, want 3", n)
|
|
}
|
|
stats := s2.Stats()
|
|
if stats.Total != 2 {
|
|
t.Errorf("Stats.Total = %d, want 2", stats.Total)
|
|
}
|
|
if stats.Retired != 1 {
|
|
t.Errorf("Stats.Retired = %d, want 1", stats.Retired)
|
|
}
|
|
|
|
got, _ := s2.Get(a.UID)
|
|
if !got.Retired {
|
|
t.Error("retired flag lost across persistence round-trip")
|
|
}
|
|
}
|
|
|
|
// ── Search filter coverage ─────────────────────────────────────
|
|
|
|
func TestSearch_TagFilter(t *testing.T) {
|
|
s := newTestStore(t)
|
|
s.Add(json.RawMessage(`{"v":1}`), "production")
|
|
s.Add(json.RawMessage(`{"v":2}`), "test")
|
|
s.Add(json.RawMessage(`{"v":3}`), "production", "edge")
|
|
|
|
prodHits := s.Search(SearchFilter{Tag: "production"})
|
|
if len(prodHits) != 2 {
|
|
t.Errorf("tag=production returned %d, want 2", len(prodHits))
|
|
}
|
|
|
|
edgeHits := s.Search(SearchFilter{Tag: "edge"})
|
|
if len(edgeHits) != 1 {
|
|
t.Errorf("tag=edge returned %d, want 1", len(edgeHits))
|
|
}
|
|
}
|
|
|
|
func TestSearch_ContentContainsFilter(t *testing.T) {
|
|
s := newTestStore(t)
|
|
s.Add(json.RawMessage(`{"role":"welder","city":"Chicago"}`))
|
|
s.Add(json.RawMessage(`{"role":"electrician","city":"Detroit"}`))
|
|
s.Add(json.RawMessage(`{"role":"safety","city":"Chicago"}`))
|
|
|
|
chi := s.Search(SearchFilter{ContentContains: "Chicago"})
|
|
if len(chi) != 2 {
|
|
t.Errorf("ContentContains=Chicago returned %d, want 2", len(chi))
|
|
}
|
|
}
|
|
|
|
func TestStats_TracksAllStates(t *testing.T) {
|
|
s := newTestStore(t)
|
|
a, _ := s.Add(json.RawMessage(`{}`))
|
|
s.Add(json.RawMessage(`{}`))
|
|
s.Add(json.RawMessage(`{}`))
|
|
s.Retire(a.UID)
|
|
|
|
st := s.Stats()
|
|
if st.Total != 3 {
|
|
t.Errorf("Total = %d, want 3", st.Total)
|
|
}
|
|
if st.Active != 2 {
|
|
t.Errorf("Active = %d, want 2", st.Active)
|
|
}
|
|
if st.Retired != 1 {
|
|
t.Errorf("Retired = %d, want 1", st.Retired)
|
|
}
|
|
}
|
|
|
|
// ── Concurrency safety ────────────────────────────────────────
|
|
|
|
func TestStore_ConcurrentAdd(t *testing.T) {
|
|
s := newTestStore(t)
|
|
const N = 100
|
|
done := make(chan bool, N)
|
|
for i := 0; i < N; i++ {
|
|
go func() {
|
|
_, err := s.Add(json.RawMessage(`{"x":1}`))
|
|
if err != nil {
|
|
t.Errorf("concurrent Add: %v", err)
|
|
}
|
|
done <- true
|
|
}()
|
|
}
|
|
for i := 0; i < N; i++ {
|
|
<-done
|
|
}
|
|
if s.Stats().Total != N {
|
|
t.Errorf("after %d concurrent Adds, Total = %d", N, s.Stats().Total)
|
|
}
|
|
}
|