package observer // Store: in-memory ring buffer + optional JSONL persistor. Same // shape as internal/pathway's persistor (afbb506) — opens the file // per Append rather than holding an fd, which is fine at the // observer's expected write rate (≤ a few hundred ops/min) and // keeps the substrate restartable mid-stream. import ( "bufio" "encoding/json" "errors" "fmt" "io/fs" "log/slog" "os" "path/filepath" "sync" ) // DefaultRingCap is the in-memory ring buffer cap. Mirrors the Rust // Phase 24 limit of 2000 (recordExternalOp shifts the head when // length > 2000). const DefaultRingCap = 2000 // DefaultRecentScenariosCap is how many recent source=scenario ops // the Stats endpoint returns. Matches the TS hard-coded slice(-10). const DefaultRecentScenariosCap = 10 // Store holds the ring buffer + the optional persistor. Thread-safe // via a single RWMutex (read-heavy via Stats; writes via Record). type Store struct { mu sync.RWMutex ring []ObservedOp cap int persistor *Persistor } // NewStore returns an empty Store. Pass nil persistor for in-memory // only (unit tests, ephemeral runs); pass a real Persistor to enable // jsonl-append-on-record. func NewStore(persistor *Persistor) *Store { return &Store{ ring: make([]ObservedOp, 0, DefaultRingCap), cap: DefaultRingCap, persistor: persistor, } } // Record validates + persists + appends. Order matters: persist // first so a crash mid-record doesn't leave the ring ahead of the // log. Returns ErrInvalidOp on validation failure (no persist, no // append). func (s *Store) Record(op ObservedOp) error { op.EnsureTimestamp() op.DefaultSource() if err := op.Validate(); err != nil { return err } if s.persistor != nil { if err := s.persistor.Append(op); err != nil { // Best-effort persistence — log but don't fail the // in-memory record. Mirrors the Rust catch{} in // persistOp; the ring buffer is the source of truth in // flight. slog.Warn("observer: persist failed", "err", err) } } s.mu.Lock() defer s.mu.Unlock() s.ring = append(s.ring, op) if len(s.ring) > s.cap { // Shift left by one (drop oldest). Avoids unbounded growth // without a per-write reallocation. copy(s.ring, s.ring[1:]) s.ring = s.ring[:len(s.ring)-1] } return nil } // Recent returns a copy of the ring buffer's current state. Most // recent entries are at the end (append-order). func (s *Store) Recent() []ObservedOp { s.mu.RLock() defer s.mu.RUnlock() out := make([]ObservedOp, len(s.ring)) copy(out, s.ring) return out } // Stats aggregates the ring buffer. Mirrors the Rust /stats // response shape exactly. func (s *Store) Stats() Stats { s.mu.RLock() defer s.mu.RUnlock() stats := Stats{ Total: len(s.ring), BySource: make(map[string]int), } for _, op := range s.ring { if op.Success { stats.Successes++ } else { stats.Failures++ } src := string(op.Source) if src == "" { src = string(SourceMCP) } stats.BySource[src]++ } // Last N scenario ops (most-recent-first → match Rust slice(-10)). scenarios := make([]ScenarioOpDigest, 0, DefaultRecentScenariosCap) for i := len(s.ring) - 1; i >= 0 && len(scenarios) < DefaultRecentScenariosCap; i-- { op := s.ring[i] if op.Source != SourceScenario { continue } scenarios = append([]ScenarioOpDigest{{ TS: op.Timestamp, OK: op.Success, Staffer: op.StafferID, Kind: op.EventKind, Role: op.Role, }}, scenarios...) } stats.RecentScenarios = scenarios return stats } // Load replays the persistor's JSONL log into the ring buffer. // Resets the ring (current state is discarded) — same semantics as // pathway.Store.Load. Corruption-tolerant: malformed lines log // warnings and the load proceeds. // // Returns the number of ops successfully replayed. func (s *Store) Load() (int, error) { if s.persistor == nil { return 0, nil } s.mu.Lock() defer s.mu.Unlock() s.ring = s.ring[:0] return s.persistor.Replay(func(op ObservedOp) error { s.ring = append(s.ring, op) if len(s.ring) > s.cap { copy(s.ring, s.ring[1:]) s.ring = s.ring[:len(s.ring)-1] } return nil }) } // ─── Persistor ────────────────────────────────────────────────── // Persistor wraps a single JSONL file. Open-per-append — same // pattern as internal/pathway. Each line is one ObservedOp. type Persistor struct { path string } // NewPersistor returns a Persistor for the given file path. Parent // directory is created on demand. Empty path is invalid (caller // passes nil to NewStore for the no-persist case). func NewPersistor(path string) (*Persistor, error) { if path == "" { return nil, errors.New("observer: persistor path is empty") } if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return nil, fmt.Errorf("observer: create dir: %w", err) } return &Persistor{path: path}, nil } // Path returns the file path the persistor writes to. func (p *Persistor) Path() string { return p.path } // Append writes one ObservedOp as a JSONL line. func (p *Persistor) Append(op ObservedOp) error { line, err := json.Marshal(op) if err != nil { return fmt.Errorf("observer: marshal op: %w", err) } f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return fmt.Errorf("observer: open log: %w", err) } defer f.Close() if _, err := f.Write(line); err != nil { return fmt.Errorf("observer: write op: %w", err) } if _, err := f.Write([]byte{'\n'}); err != nil { return fmt.Errorf("observer: write newline: %w", err) } return nil } // Replay reads the log line-by-line and invokes apply for each op. // Returns the count successfully applied. Missing file = 0 + nil // (legitimate cold-start state). Malformed lines log a warning and // the replay continues. func (p *Persistor) Replay(apply func(ObservedOp) error) (int, error) { f, err := os.Open(p.path) if errors.Is(err, fs.ErrNotExist) { return 0, nil } if err != nil { return 0, fmt.Errorf("observer: open log: %w", err) } defer f.Close() scanner := bufio.NewScanner(f) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1<<20) // 1 MiB per line cap applied, skipped, lineNo := 0, 0, 0 for scanner.Scan() { lineNo++ raw := scanner.Bytes() if len(raw) == 0 { continue } var op ObservedOp if err := json.Unmarshal(raw, &op); err != nil { slog.Warn("observer: replay skipped malformed line", "path", p.path, "line", lineNo, "err", err.Error()) skipped++ continue } if err := apply(op); err != nil { slog.Warn("observer: replay apply failed", "path", p.path, "line", lineNo, "err", err.Error()) skipped++ continue } applied++ } if err := scanner.Err(); err != nil { return applied, fmt.Errorf("observer: scan log: %w", err) } if skipped > 0 { slog.Info("observer: replay completed with skips", "path", p.path, "applied", applied, "skipped", skipped) } return applied, nil }