// Package observer is the Go port of mcp-server/observer.ts (Rust // system, 852 lines TS) — the "third-party witness" loop that records // every observed operation, surfaces failures, and feeds learnings // back into the substrate. // // What this package owns (this commit): // - ObservedOp data model + ring buffer + JSONL persistence // - Stats aggregation (total / successes / failures / by_source) // - Source taxonomy (mcp / scenario / langfuse / overseer_correction) // // What's deferred to follow-up commits: // - /review endpoint with cloud-LLM hand-review (the heuristic // plus qwen3-coder fall-back path) // - tailOverseerCorrections (background loop reading // overseer_corrections.jsonl) // - analyzeErrors / consolidatePlaybooks periodic loops // - escalateFailureClusterToLLMTeam (failure clustering trigger) // // /relevance was already ported in 9588bd8 (component 3 of SPEC §3.4) // and lives in internal/matrix/relevance.go; the observer package // doesn't re-implement it. package observer import ( "errors" "time" ) // Source is the provenance of an observed op. Empty string defaults // to SourceMCP for back-compat with Phase 24 callers. type Source string const ( SourceMCP Source = "mcp" SourceScenario Source = "scenario" SourceLangfuse Source = "langfuse" SourceOverseerCorrection Source = "overseer_correction" // SourceWorkflow tags ObservedOps emitted by the workflow runner // (one per node execution). Added 2026-04-29 scrum2 (Opus BLOCK): // the workflow handler was casting a string literal to Source, // which worked coincidentally but left the taxonomy implicit. SourceWorkflow Source = "workflow" // SourceInbox tags ObservedOps emitted by /observer/inbox — incoming // real-world signals (email, SMS) that a coordinator would receive // and act on. The handler only RECORDS the message; downstream // triggers (e.g. matrix.search on the parsed demand) are the // caller's concern, recorded separately. SourceInbox Source = "inbox" ) // ObservedOp is one entry in the observer's ring buffer (and JSONL // log when persistence is configured). Mirrors the Rust ObservedOp // shape exactly so the on-wire JSON round-trips between the two // implementations during the Rust→Go cutover. // // Optional fields use omitempty so absent values don't bloat the // JSONL file. Numeric zero values are intentionally treated as // "not set" by the JSON layer; if a real zero needs to be // persisted, future schema-version bump can switch to pointers. type ObservedOp struct { Timestamp string `json:"timestamp"` // ISO 8601 Endpoint string `json:"endpoint"` InputSummary string `json:"input_summary"` Success bool `json:"success"` DurationMs int64 `json:"duration_ms"` OutputSummary string `json:"output_summary"` Error string `json:"error,omitempty"` Source Source `json:"source,omitempty"` StafferID string `json:"staffer_id,omitempty"` SigHash string `json:"sig_hash,omitempty"` EventKind string `json:"event_kind,omitempty"` Role string `json:"role,omitempty"` City string `json:"city,omitempty"` State string `json:"state,omitempty"` Count int `json:"count,omitempty"` RescueAttempted bool `json:"rescue_attempted,omitempty"` RescueSucceeded bool `json:"rescue_succeeded,omitempty"` TaskClass string `json:"task_class,omitempty"` Correction string `json:"correction,omitempty"` AppliedAtTurn int `json:"applied_at_turn,omitempty"` } // Stats is the aggregated view of the ring buffer — useful for // dashboards and the GET /stats endpoint. RecentScenarios holds the // most recent N source=scenario ops (default cap 10) so operators // can see what the staffing scenarios are emitting at a glance. type Stats struct { Total int `json:"total"` Successes int `json:"successes"` Failures int `json:"failures"` BySource map[string]int `json:"by_source"` RecentScenarios []ScenarioOpDigest `json:"recent_scenario_ops"` } // ScenarioOpDigest is the slim per-op shape returned in // Stats.RecentScenarios — matches the TS digest exactly: // {ts, ok, staffer, kind, role}. type ScenarioOpDigest struct { TS string `json:"ts"` OK bool `json:"ok"` Staffer string `json:"staffer"` Kind string `json:"kind"` Role string `json:"role"` } // Errors surfaced to HTTP handlers. var ( ErrInvalidOp = errors.New("observer: invalid op (timestamp + endpoint required)") ) // Validate returns an error if required fields are missing. Called // by Record before the op is added to the ring buffer. func (op ObservedOp) Validate() error { if op.Timestamp == "" { return ErrInvalidOp } if op.Endpoint == "" { return ErrInvalidOp } return nil } // EnsureTimestamp populates Timestamp with the current UTC ISO 8601 // time if it's empty. Useful for HTTP handlers that take the body // as authoritative but need to default the timestamp when absent. func (op *ObservedOp) EnsureTimestamp() { if op.Timestamp == "" { op.Timestamp = time.Now().UTC().Format(time.RFC3339) } } // DefaultSource sets Source to SourceMCP if empty. Mirrors the Rust // `op.source ?? "mcp"` pattern in recordExternalOp. func (op *ObservedOp) DefaultSource() { if op.Source == "" { op.Source = SourceMCP } }