Phase 3 ask: real-world inbox-style event injection during the stress
test. Coordinators in production receive emails + SMS that trigger
contract responses; the substrate has to RECORD these signals AND
react with a search using the embedded demand. This commit lands the
endpoint and exercises it end-to-end in the stress harness.
observerd surface:
- New POST /observer/inbox route — accepts {type, sender, subject,
body, priority, tag} and records as ObservedOp with
Source=SourceInbox. Type must be email|sms; body required;
priority defaults to medium. The handler ONLY records — downstream
triggers (search, ingest, etc.) are the caller's concern, recorded
separately. Keeps the witness role pure.
- New observer.SourceInbox = "inbox" alongside SourceMCP /
SourceScenario / SourceWorkflow.
- Three contract tests on the new route (happy path / bad type / empty
body), router-mount test extended, all green.
Stress harness phase 1c (Hour 9):
- 6 inbox events fire in priority order (urgent → high → medium):
2 urgent emails (forklift Cleveland, production Indianapolis)
1 high email (crane Chicago)
1 high sms (bilingual safety Indianapolis)
1 medium sms (drone Chicago)
1 medium email (warehouse Milwaukee FYI)
- Each event:
1. POSTs to /v1/observer/inbox (recorded by observerd)
2. Triggers matrix.search using a parsed demand (the demand
extraction is hard-coded for now; production needs a small
LLM to parse from body)
3. Captures both as events in the run JSON
Run #006 result (with v2-moe embedder + all phases including inbox):
Diversity:
Same-role-across-contracts Jaccard = 0.000 (n=9)
Different-roles-same-contract Jaccard = 0.046 (n=18)
Determinism: 1.000
Verbatim handover: 4/4 (100%)
Paraphrase handover: 4/4 (100%)
Inbox burst:
6/6 events accepted by observerd (200 status, all recorded)
6/6 triggered searches produced distinct top-1 worker IDs
distance distribution: 0.24 (Indy production) → 0.71 (Chicago
drone surveyor — honest stretch since drones aren't in the
5K-worker corpus, system surfaces closest neighbor at high
distance rather than fabricating)
The drone-Chicago case is the architectural-honesty signal: when
the demand asks for a specialist NOT in the roster, the system
returns the closest semantic neighbor with a distance that flags
"this is a stretch." Coordinators reading distances see "we don't
have a great match here" rather than a confident wrong answer.
Total events captured: 67 (was 61 pre-inbox).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
143 lines
5.3 KiB
Go
143 lines
5.3 KiB
Go
// Package observer is the Go port of mcp-server/observer.ts (Rust
|
|
// system, 852 lines TS) — the "third-party witness" loop that records
|
|
// every observed operation, surfaces failures, and feeds learnings
|
|
// back into the substrate.
|
|
//
|
|
// What this package owns (this commit):
|
|
// - ObservedOp data model + ring buffer + JSONL persistence
|
|
// - Stats aggregation (total / successes / failures / by_source)
|
|
// - Source taxonomy (mcp / scenario / langfuse / overseer_correction)
|
|
//
|
|
// What's deferred to follow-up commits:
|
|
// - /review endpoint with cloud-LLM hand-review (the heuristic
|
|
// plus qwen3-coder fall-back path)
|
|
// - tailOverseerCorrections (background loop reading
|
|
// overseer_corrections.jsonl)
|
|
// - analyzeErrors / consolidatePlaybooks periodic loops
|
|
// - escalateFailureClusterToLLMTeam (failure clustering trigger)
|
|
//
|
|
// /relevance was already ported in 9588bd8 (component 3 of SPEC §3.4)
|
|
// and lives in internal/matrix/relevance.go; the observer package
|
|
// doesn't re-implement it.
|
|
|
|
package observer
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
)
|
|
|
|
// Source is the provenance of an observed op. Empty string defaults
|
|
// to SourceMCP for back-compat with Phase 24 callers.
|
|
type Source string
|
|
|
|
const (
|
|
SourceMCP Source = "mcp"
|
|
SourceScenario Source = "scenario"
|
|
SourceLangfuse Source = "langfuse"
|
|
SourceOverseerCorrection Source = "overseer_correction"
|
|
// SourceWorkflow tags ObservedOps emitted by the workflow runner
|
|
// (one per node execution). Added 2026-04-29 scrum2 (Opus BLOCK):
|
|
// the workflow handler was casting a string literal to Source,
|
|
// which worked coincidentally but left the taxonomy implicit.
|
|
SourceWorkflow Source = "workflow"
|
|
// SourceInbox tags ObservedOps emitted by /observer/inbox — incoming
|
|
// real-world signals (email, SMS) that a coordinator would receive
|
|
// and act on. The handler only RECORDS the message; downstream
|
|
// triggers (e.g. matrix.search on the parsed demand) are the
|
|
// caller's concern, recorded separately.
|
|
SourceInbox Source = "inbox"
|
|
)
|
|
|
|
// ObservedOp is one entry in the observer's ring buffer (and JSONL
|
|
// log when persistence is configured). Mirrors the Rust ObservedOp
|
|
// shape exactly so the on-wire JSON round-trips between the two
|
|
// implementations during the Rust→Go cutover.
|
|
//
|
|
// Optional fields use omitempty so absent values don't bloat the
|
|
// JSONL file. Numeric zero values are intentionally treated as
|
|
// "not set" by the JSON layer; if a real zero needs to be
|
|
// persisted, future schema-version bump can switch to pointers.
|
|
type ObservedOp struct {
|
|
Timestamp string `json:"timestamp"` // ISO 8601
|
|
Endpoint string `json:"endpoint"`
|
|
InputSummary string `json:"input_summary"`
|
|
Success bool `json:"success"`
|
|
DurationMs int64 `json:"duration_ms"`
|
|
OutputSummary string `json:"output_summary"`
|
|
Error string `json:"error,omitempty"`
|
|
|
|
Source Source `json:"source,omitempty"`
|
|
StafferID string `json:"staffer_id,omitempty"`
|
|
SigHash string `json:"sig_hash,omitempty"`
|
|
EventKind string `json:"event_kind,omitempty"`
|
|
Role string `json:"role,omitempty"`
|
|
City string `json:"city,omitempty"`
|
|
State string `json:"state,omitempty"`
|
|
Count int `json:"count,omitempty"`
|
|
|
|
RescueAttempted bool `json:"rescue_attempted,omitempty"`
|
|
RescueSucceeded bool `json:"rescue_succeeded,omitempty"`
|
|
|
|
TaskClass string `json:"task_class,omitempty"`
|
|
Correction string `json:"correction,omitempty"`
|
|
AppliedAtTurn int `json:"applied_at_turn,omitempty"`
|
|
}
|
|
|
|
// Stats is the aggregated view of the ring buffer — useful for
|
|
// dashboards and the GET /stats endpoint. RecentScenarios holds the
|
|
// most recent N source=scenario ops (default cap 10) so operators
|
|
// can see what the staffing scenarios are emitting at a glance.
|
|
type Stats struct {
|
|
Total int `json:"total"`
|
|
Successes int `json:"successes"`
|
|
Failures int `json:"failures"`
|
|
BySource map[string]int `json:"by_source"`
|
|
RecentScenarios []ScenarioOpDigest `json:"recent_scenario_ops"`
|
|
}
|
|
|
|
// ScenarioOpDigest is the slim per-op shape returned in
|
|
// Stats.RecentScenarios — matches the TS digest exactly:
|
|
// {ts, ok, staffer, kind, role}.
|
|
type ScenarioOpDigest struct {
|
|
TS string `json:"ts"`
|
|
OK bool `json:"ok"`
|
|
Staffer string `json:"staffer"`
|
|
Kind string `json:"kind"`
|
|
Role string `json:"role"`
|
|
}
|
|
|
|
// Errors surfaced to HTTP handlers.
|
|
var (
|
|
ErrInvalidOp = errors.New("observer: invalid op (timestamp + endpoint required)")
|
|
)
|
|
|
|
// Validate returns an error if required fields are missing. Called
|
|
// by Record before the op is added to the ring buffer.
|
|
func (op ObservedOp) Validate() error {
|
|
if op.Timestamp == "" {
|
|
return ErrInvalidOp
|
|
}
|
|
if op.Endpoint == "" {
|
|
return ErrInvalidOp
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EnsureTimestamp populates Timestamp with the current UTC ISO 8601
|
|
// time if it's empty. Useful for HTTP handlers that take the body
|
|
// as authoritative but need to default the timestamp when absent.
|
|
func (op *ObservedOp) EnsureTimestamp() {
|
|
if op.Timestamp == "" {
|
|
op.Timestamp = time.Now().UTC().Format(time.RFC3339)
|
|
}
|
|
}
|
|
|
|
// DefaultSource sets Source to SourceMCP if empty. Mirrors the Rust
|
|
// `op.source ?? "mcp"` pattern in recordExternalOp.
|
|
func (op *ObservedOp) DefaultSource() {
|
|
if op.Source == "" {
|
|
op.Source = SourceMCP
|
|
}
|
|
}
|