Closes Sprint 2 design-bar work (audit reports/scrum/sprint-backlog.md):
S2.1 — ADR-004 documents the pathway-memory data model
S2.2 — pathway port lands with deterministic fixture corpus
and full test coverage on day one
S2.3 — retired traces are excluded from retrieval (test
passes; would fail without the filter)
Mem0-style operations: Add / AddIdempotent / Update / Revise /
Retire / Get / History / Search. Each operation is a method on
Store; persistence is JSONL append-only with corruption recovery
on Replay.
internal/pathway/types.go Trace + event + SearchFilter + sentinel errors
internal/pathway/store.go in-memory state + RWMutex + ops
internal/pathway/persistor.go JSONL append-only log with replay
internal/pathway/store_test.go 20 test funcs covering all 7
Sprint 2 claim rows + concurrency
internal/pathway/persistor_test.go 6 test funcs covering missing-
file, corruption recovery, long-line
handling, parent-dir auto-create,
apply-error skip behavior
Sprint 2 claim coverage row-by-row:
ADD TestAdd_AssignsUIDAndTimestamps + TestAdd_RejectsInvalidJSON
UPDATE TestUpdate_ReplacesContentSameUID + Update_MissingUID_Errors
REVISE TestRevise_LinksToPredecessorViaHistory +
TestRevise_PredecessorMissing_Errors +
TestRevise_ChainOfThree_BackwardWalk
RETIRE TestRetire_ExcludedFromSearch +
TestRetire_StillAccessibleViaGet +
TestRetire_StillAccessibleViaHistory
HISTORY/cycle TestHistory_CycleDetected (injected via internal map),
TestHistory_PredecessorMissing_TruncatesChain,
TestHistory_UnknownUID_ErrorsClean
REPLAY/dup TestAddIdempotent_IncrementsReplayCount (locks the
"replay preserves original content" rule per ADR-004)
CORRUPTION TestPersistor_CorruptedLines_Skipped +
TestPersistor_ApplyError_Skipped
ROUND-TRIP TestPersistor_RoundTrip locks the full Save → fresh
Store → Load → Stats-match contract
Two real bugs caught during testing:
- Add returned the same *Trace stored in the map, so callers
holding a reference saw later mutations. Fixed: clone before
return (matches Get's contract). Same fix in AddIdempotent
+ Revise.
- Test typo: {"v":different} isn't valid JSON; AddIdempotent's
json.Valid rejected it as ErrInvalidContent. Test fixed to
use {"v":"different"}; the validation behavior is correct.
Skipped this commit (next):
- cmd/pathwayd HTTP binary
- gateway routing for /v1/pathway/*
- end-to-end smoke
These add the wire surface; the substrate ships first so the
wire layer can be a pure proxy in the next commit.
Verified:
go test -count=1 ./internal/pathway/ — 26 tests green
just verify — vet + test + 9 smokes 34s
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
131 lines
3.9 KiB
Go
131 lines
3.9 KiB
Go
// persistor.go — JSONL append-only persistence for pathway memory.
|
|
//
|
|
// Each event is one JSON line. Append is O(1) (open append, write,
|
|
// close — Go's *os.File default fsync policy is "rely on OS" which
|
|
// is fine here; correctness on power-loss is best-effort, not
|
|
// transactional). Replay reads the file once at startup.
|
|
//
|
|
// Corruption recovery: malformed lines log a warn (counted in
|
|
// Replay's return) but do not stop the load. Partial state is
|
|
// better than no state for an agent substrate.
|
|
//
|
|
// What's NOT here:
|
|
// - Compaction. JSONL grows linearly with mutations; below 100K
|
|
// traces this is fine. Compaction will land when needed and
|
|
// will emit a snapshot file + tail JSONL.
|
|
// - fsync per write. We rely on the OS's eventual fsync; trace
|
|
// loss on hard crash is acceptable for the substrate's
|
|
// "remember most things" guarantee.
|
|
|
|
package pathway
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
)
|
|
|
|
// Persistor wraps a single JSONL file. Construct with NewPersistor;
|
|
// it does NOT load on construction — callers must call Store.Load()
|
|
// to replay.
|
|
type Persistor struct {
|
|
path string
|
|
}
|
|
|
|
// NewPersistor returns a persistor for the given file path. The
|
|
// parent directory is created on demand. The file is created lazily
|
|
// on first Append.
|
|
func NewPersistor(path string) (*Persistor, error) {
|
|
if path == "" {
|
|
return nil, errors.New("pathway: persistor path is empty")
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return nil, fmt.Errorf("pathway: create dir: %w", err)
|
|
}
|
|
return &Persistor{path: path}, nil
|
|
}
|
|
|
|
// Path returns the underlying file path. Useful for tests + logs.
|
|
func (p *Persistor) Path() string { return p.path }
|
|
|
|
// Append writes one event to the JSONL log. Each call opens the
|
|
// file in append mode, writes one line, and closes — simple but
|
|
// correct. A pooled persistent fd is a future optimization if
|
|
// profiling shows append-rate matters.
|
|
func (p *Persistor) Append(e event) error {
|
|
line, err := json.Marshal(e)
|
|
if err != nil {
|
|
return fmt.Errorf("pathway: marshal event: %w", err)
|
|
}
|
|
f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return fmt.Errorf("pathway: open log: %w", err)
|
|
}
|
|
defer f.Close()
|
|
if _, err := f.Write(line); err != nil {
|
|
return fmt.Errorf("pathway: write event: %w", err)
|
|
}
|
|
if _, err := f.Write([]byte{'\n'}); err != nil {
|
|
return fmt.Errorf("pathway: write newline: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Replay reads the log line-by-line and invokes apply for each
|
|
// event. Returns the count of events successfully applied. A
|
|
// missing file is NOT an error (means "no prior state"); a
|
|
// partially-corrupt file logs warns and continues.
|
|
func (p *Persistor) Replay(apply func(event) error) (int, error) {
|
|
f, err := os.Open(p.path)
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
return 0, nil
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("pathway: open log: %w", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
// Big buffer for unusually long content — 1 MiB per line cap.
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1<<20)
|
|
|
|
applied := 0
|
|
skipped := 0
|
|
lineNo := 0
|
|
for scanner.Scan() {
|
|
lineNo++
|
|
raw := scanner.Bytes()
|
|
if len(raw) == 0 {
|
|
continue
|
|
}
|
|
var e event
|
|
if err := json.Unmarshal(raw, &e); err != nil {
|
|
slog.Warn("pathway: replay skipped malformed line",
|
|
"path", p.path, "line", lineNo, "err", err.Error())
|
|
skipped++
|
|
continue
|
|
}
|
|
if err := apply(e); err != nil {
|
|
slog.Warn("pathway: replay event apply failed",
|
|
"path", p.path, "line", lineNo, "op", e.Op, "err", err.Error())
|
|
skipped++
|
|
continue
|
|
}
|
|
applied++
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return applied, fmt.Errorf("pathway: scan log: %w", err)
|
|
}
|
|
if skipped > 0 {
|
|
slog.Info("pathway: replay completed with skips",
|
|
"path", p.path, "applied", applied, "skipped", skipped)
|
|
}
|
|
return applied, nil
|
|
}
|