root e7fc63b216 observerd: /observer/inbox + multi-coord stress phase 1c (priority-ordered events)
Phase 3 ask: real-world inbox-style event injection during the stress
test. Coordinators in production receive emails + SMS that trigger
contract responses; the substrate has to RECORD these signals AND
react with a search using the embedded demand. This commit lands the
endpoint and exercises it end-to-end in the stress harness.

observerd surface:
- New POST /observer/inbox route — accepts {type, sender, subject,
  body, priority, tag} and records as ObservedOp with
  Source=SourceInbox. Type must be email|sms; body required;
  priority defaults to medium. The handler ONLY records — downstream
  triggers (search, ingest, etc.) are the caller's concern, recorded
  separately. Keeps the witness role pure.
- New observer.SourceInbox = "inbox" alongside SourceMCP /
  SourceScenario / SourceWorkflow.
- Three contract tests on the new route (happy path / bad type / empty
  body), router-mount test extended, all green.

Stress harness phase 1c (Hour 9):
- 6 inbox events fire in priority order (urgent → high → medium):
    2 urgent emails (forklift Cleveland, production Indianapolis)
    1 high email (crane Chicago)
    1 high sms (bilingual safety Indianapolis)
    1 medium sms (drone Chicago)
    1 medium email (warehouse Milwaukee FYI)
- Each event:
    1. POSTs to /v1/observer/inbox (recorded by observerd)
    2. Triggers matrix.search using a parsed demand (the demand
       extraction is hard-coded for now; production needs a small
       LLM to parse from body)
    3. Captures both as events in the run JSON

Run #006 result (with v2-moe embedder + all phases including inbox):

  Diversity:
    Same-role-across-contracts Jaccard = 0.000 (n=9)
    Different-roles-same-contract Jaccard = 0.046 (n=18)
  Determinism: 1.000
  Verbatim handover: 4/4 (100%)
  Paraphrase handover: 4/4 (100%)
  Inbox burst:
    6/6 events accepted by observerd (200 status, all recorded)
    6/6 triggered searches produced distinct top-1 worker IDs
    distance distribution: 0.24 (Indy production) → 0.71 (Chicago
    drone surveyor — honest stretch since drones aren't in the
    5K-worker corpus, system surfaces closest neighbor at high
    distance rather than fabricating)

The drone-Chicago case is the architectural-honesty signal: when
the demand asks for a specialist NOT in the roster, the system
returns the closest semantic neighbor with a distance that flags
"this is a stretch." Coordinators reading distances see "we don't
have a great match here" rather than a confident wrong answer.

Total events captured: 67 (was 61 pre-inbox).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 08:34:36 -05:00

230 lines
8.0 KiB
Go

package main
import (
"bytes"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/observer"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/workflow"
)
// newTestRouter builds the observerd router with an in-memory store
// and a workflow runner with no modes registered. Closes R-005 for
// observerd.
//
// Returns chi.Router (not http.Handler) so chi.Walk works without a
// type assertion that would panic if a future refactor wraps the
// router in plain net/http middleware.
func newTestRouter(t *testing.T) chi.Router {
t.Helper()
h := &handlers{
store: observer.NewStore(nil),
runner: workflow.NewRunner(),
}
r := chi.NewRouter()
h.register(r)
return r
}
func TestRoutesMounted(t *testing.T) {
r := newTestRouter(t)
want := map[string]bool{
"GET /observer/stats": false,
"POST /observer/event": false,
"POST /observer/workflow/run": false,
"GET /observer/workflow/modes": false,
"POST /observer/inbox": false,
}
_ = chi.Walk(r, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error {
key := method + " " + route
if _, ok := want[key]; ok {
want[key] = true
}
return nil
})
for k, mounted := range want {
if !mounted {
t.Errorf("route not mounted: %s", k)
}
}
}
func TestStats_GET(t *testing.T) {
r := newTestRouter(t)
req := httptest.NewRequest("GET", "/observer/stats", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
}
func TestWorkflowModes_GET(t *testing.T) {
r := newTestRouter(t)
req := httptest.NewRequest("GET", "/observer/workflow/modes", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
}
// TestEvent_InvalidOp locks the validation path: an ObservedOp with
// missing required fields must 400, not 500. Without this assertion,
// observer.ErrInvalidOp could silently slip into the 500 branch on a
// future refactor and clients would see "internal" instead of the
// actual validation error.
func TestEvent_InvalidOp(t *testing.T) {
r := newTestRouter(t)
// Empty body — no endpoint, no source — fails ObservedOp validation.
body := []byte(`{}`)
req := httptest.NewRequest("POST", "/observer/event", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 on invalid op, got %d (body=%s)", w.Code, w.Body.String())
}
}
// TestWorkflowRun_AllProvenanceRecordedPostRun proves the gap ratified
// in ADR-005 Decision 5.3: handleWorkflowRun calls runner.Run
// synchronously and only records ObservedOps from the returned
// RunResult AFTER Run completes. A crash mid-Run would lose ALL
// provenance for that workflow.
//
// The test pauses inside a node, samples observer state (must be 0),
// unblocks, then samples again (must be N). If a future commit adds
// per-node streaming (e.g. runner.NodeHook firing before Run returns),
// the first assertion fires — that's the intentional test-as-spec
// lock so the behavior change is visible in `go test` instead of
// surfacing under load.
func TestWorkflowRun_AllProvenanceRecordedPostRun(t *testing.T) {
pauseCh := make(chan struct{})
runner := workflow.NewRunner()
runner.RegisterMode("test.pause", func(_ workflow.Context, _ map[string]any) (map[string]any, error) {
<-pauseCh
return map[string]any{"unpaused": true}, nil
})
h := &handlers{
store: observer.NewStore(nil),
runner: runner,
}
r := chi.NewRouter()
h.register(r)
// Two-node serial workflow so we have something to record post-run.
body := []byte(`{"workflow":{"name":"adr_005_5_3","nodes":[
{"id":"n1","mode":"test.pause"},
{"id":"n2","mode":"test.pause","depends_on":["n1"]}
]}}`)
// Send the request in a goroutine — it'll block until pauseCh closes.
done := make(chan int)
go func() {
req := httptest.NewRequest("POST", "/observer/workflow/run", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
done <- w.Code
}()
// Wait briefly for the runner to enter n1 and block on pauseCh.
// 50ms is conservative; the goroutine + chi routing + topo sort
// take well under that on this hardware.
time.Sleep(50 * time.Millisecond)
// LOCK: store MUST be empty while runner.Run is paused.
// If a future change adds streaming-record-as-each-node-finishes,
// n1's record would land here as soon as n1 returns — but n1
// hasn't returned yet (we're paused before it does), so the
// only way this assertion passes is if recording is post-run-only.
if got := h.store.Stats().Total; got != 0 {
t.Errorf("expected 0 observer ops during paused run, got %d "+
"(if non-zero, ADR-005 Decision 5.3 must be updated — recording "+
"is no longer post-run-only)", got)
}
// Unblock all paused nodes (channel close broadcasts to all receivers).
close(pauseCh)
// Wait for the handler to return + record post-run.
if code := <-done; code != http.StatusOK {
t.Errorf("workflow run failed: HTTP %d", code)
}
// LOCK: store MUST have 2 ops after run completes.
if got := h.store.Stats().Total; got != 2 {
t.Errorf("expected 2 observer ops after run, got %d", got)
}
}
// TestInbox_AcceptsValidEmail locks the happy-path contract for the
// /observer/inbox route — accepts an email message with required
// fields, records as ObservedOp, returns 200 with ring-size.
func TestInbox_AcceptsValidEmail(t *testing.T) {
r := newTestRouter(t)
body := []byte(`{"type":"email","sender":"client@northstar.com","subject":"URGENT: 50 forklift ops","body":"Need 50 forklift operators in Cleveland OH for next week. Day shift.","priority":"urgent","tag":"alpha-surge"}`)
req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), `"accepted":true`) {
t.Errorf("expected accepted=true, got %s", w.Body.String())
}
}
// TestInbox_RejectsBadType locks the validation: type must be
// "email" or "sms", anything else is 400.
func TestInbox_RejectsBadType(t *testing.T) {
r := newTestRouter(t)
body := []byte(`{"type":"smoke-signal","sender":"x","body":"y","priority":"high"}`)
req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 on bad type, got %d", w.Code)
}
}
// TestInbox_RejectsEmptyBody locks the body-required invariant.
func TestInbox_RejectsEmptyBody(t *testing.T) {
r := newTestRouter(t)
body := []byte(`{"type":"email","sender":"x","body":"","priority":"high"}`)
req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 on empty body, got %d", w.Code)
}
}
// TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions
// that reference modes not registered with the runner. The harness's
// reality test runs depend on this so an unknown-mode misconfiguration
// surfaces as a definition error, not a server error.
func TestWorkflowRun_UnknownMode(t *testing.T) {
r := newTestRouter(t)
body := []byte(`{"workflow":{"name":"t","nodes":[{"id":"n1","mode":"does.not.exist"}]}}`)
req := httptest.NewRequest("POST", "/observer/workflow/run", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 on unknown mode, got %d (body=%s)", w.Code, w.Body.String())
}
}