From e7fc63b216af45e193c113158c40490e6e5940a4 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 30 Apr 2026 08:34:36 -0500 Subject: [PATCH] observerd: /observer/inbox + multi-coord stress phase 1c (priority-ordered events) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 ask: real-world inbox-style event injection during the stress test. Coordinators in production receive emails + SMS that trigger contract responses; the substrate has to RECORD these signals AND react with a search using the embedded demand. This commit lands the endpoint and exercises it end-to-end in the stress harness. observerd surface: - New POST /observer/inbox route — accepts {type, sender, subject, body, priority, tag} and records as ObservedOp with Source=SourceInbox. Type must be email|sms; body required; priority defaults to medium. The handler ONLY records — downstream triggers (search, ingest, etc.) are the caller's concern, recorded separately. Keeps the witness role pure. - New observer.SourceInbox = "inbox" alongside SourceMCP / SourceScenario / SourceWorkflow. - Three contract tests on the new route (happy path / bad type / empty body), router-mount test extended, all green. Stress harness phase 1c (Hour 9): - 6 inbox events fire in priority order (urgent → high → medium): 2 urgent emails (forklift Cleveland, production Indianapolis) 1 high email (crane Chicago) 1 high sms (bilingual safety Indianapolis) 1 medium sms (drone Chicago) 1 medium email (warehouse Milwaukee FYI) - Each event: 1. POSTs to /v1/observer/inbox (recorded by observerd) 2. Triggers matrix.search using a parsed demand (the demand extraction is hard-coded for now; production needs a small LLM to parse from body) 3. Captures both as events in the run JSON Run #006 result (with v2-moe embedder + all phases including inbox): Diversity: Same-role-across-contracts Jaccard = 0.000 (n=9) Different-roles-same-contract Jaccard = 0.046 (n=18) Determinism: 1.000 Verbatim handover: 4/4 (100%) Paraphrase handover: 4/4 (100%) Inbox burst: 6/6 events accepted by observerd (200 status, all recorded) 6/6 triggered searches produced distinct top-1 worker IDs distance distribution: 0.24 (Indy production) → 0.71 (Chicago drone surveyor — honest stretch since drones aren't in the 5K-worker corpus, system surfaces closest neighbor at high distance rather than fabricating) The drone-Chicago case is the architectural-honesty signal: when the demand asks for a specialist NOT in the roster, the system returns the closest semantic neighbor with a distance that flags "this is a stretch." Coordinators reading distances see "we don't have a great match here" rather than a confident wrong answer. Total events captured: 67 (was 61 pre-inbox). Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/observerd/main.go | 56 ++++++++ cmd/observerd/main_test.go | 47 +++++++ internal/observer/types.go | 6 + .../reality-tests/multi_coord_stress_006.md | 82 +++++++++++ scripts/multi_coord_stress/main.go | 130 ++++++++++++++++++ 5 files changed, 321 insertions(+) create mode 100644 reports/reality-tests/multi_coord_stress_006.md diff --git a/cmd/observerd/main.go b/cmd/observerd/main.go index a95239c..5f32d73 100644 --- a/cmd/observerd/main.go +++ b/cmd/observerd/main.go @@ -93,6 +93,62 @@ func (h *handlers) register(r chi.Router) { r.Post("/observer/event", h.handleEvent) r.Post("/observer/workflow/run", h.handleWorkflowRun) r.Get("/observer/workflow/modes", h.handleWorkflowModes) + r.Post("/observer/inbox", h.handleInbox) +} + +// inboxMessage is the POST /observer/inbox body — an incoming +// real-world signal (email or SMS) that a coordinator would receive +// and act on. The handler only RECORDS it as an ObservedOp; whether +// to trigger a downstream matrix.search or workflow is the caller's +// concern. Keeps observer's witness role pure. +type inboxMessage struct { + Type string `json:"type"` // "email" | "sms" + Sender string `json:"sender"` + Subject string `json:"subject,omitempty"` + Body string `json:"body"` + Priority string `json:"priority"` // "urgent" | "high" | "medium" | "low" + Tag string `json:"tag,omitempty"` +} + +func (h *handlers) handleInbox(w http.ResponseWriter, r *http.Request) { + var msg inboxMessage + if !decodeJSON(w, r, &msg) { + return + } + if msg.Type != "email" && msg.Type != "sms" { + http.Error(w, "type must be 'email' or 'sms'", http.StatusBadRequest) + return + } + if strings.TrimSpace(msg.Body) == "" { + http.Error(w, "body required", http.StatusBadRequest) + return + } + if msg.Priority == "" { + msg.Priority = "medium" + } + op := observer.ObservedOp{ + Endpoint: "/observer/inbox/" + msg.Type, + InputSummary: fmt.Sprintf("from=%s priority=%s tag=%s subject=%s", msg.Sender, msg.Priority, msg.Tag, msg.Subject), + OutputSummary: msg.Body, + Source: observer.SourceInbox, + Success: true, + } + if err := h.store.Record(op); err != nil { + if errors.Is(err, observer.ErrInvalidOp) { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + slog.Error("observer record inbox", "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + stats := h.store.Stats() + writeJSON(w, http.StatusOK, map[string]any{ + "accepted": true, + "type": msg.Type, + "priority": msg.Priority, + "ring_size": stats.Total, + }) } func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) { diff --git a/cmd/observerd/main_test.go b/cmd/observerd/main_test.go index 396e0c7..9609de2 100644 --- a/cmd/observerd/main_test.go +++ b/cmd/observerd/main_test.go @@ -4,6 +4,7 @@ import ( "bytes" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -38,6 +39,7 @@ func TestRoutesMounted(t *testing.T) { "POST /observer/event": false, "POST /observer/workflow/run": false, "GET /observer/workflow/modes": false, + "POST /observer/inbox": false, } _ = chi.Walk(r, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error { key := method + " " + route @@ -165,6 +167,51 @@ func TestWorkflowRun_AllProvenanceRecordedPostRun(t *testing.T) { } } +// TestInbox_AcceptsValidEmail locks the happy-path contract for the +// /observer/inbox route — accepts an email message with required +// fields, records as ObservedOp, returns 200 with ring-size. +func TestInbox_AcceptsValidEmail(t *testing.T) { + r := newTestRouter(t) + body := []byte(`{"type":"email","sender":"client@northstar.com","subject":"URGENT: 50 forklift ops","body":"Need 50 forklift operators in Cleveland OH for next week. Day shift.","priority":"urgent","tag":"alpha-surge"}`) + req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"accepted":true`) { + t.Errorf("expected accepted=true, got %s", w.Body.String()) + } +} + +// TestInbox_RejectsBadType locks the validation: type must be +// "email" or "sms", anything else is 400. +func TestInbox_RejectsBadType(t *testing.T) { + r := newTestRouter(t) + body := []byte(`{"type":"smoke-signal","sender":"x","body":"y","priority":"high"}`) + req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 on bad type, got %d", w.Code) + } +} + +// TestInbox_RejectsEmptyBody locks the body-required invariant. +func TestInbox_RejectsEmptyBody(t *testing.T) { + r := newTestRouter(t) + body := []byte(`{"type":"email","sender":"x","body":"","priority":"high"}`) + req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 on empty body, got %d", w.Code) + } +} + // TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions // that reference modes not registered with the runner. The harness's // reality test runs depend on this so an unknown-mode misconfiguration diff --git a/internal/observer/types.go b/internal/observer/types.go index 759000b..9e85eda 100644 --- a/internal/observer/types.go +++ b/internal/observer/types.go @@ -41,6 +41,12 @@ const ( // the workflow handler was casting a string literal to Source, // which worked coincidentally but left the taxonomy implicit. SourceWorkflow Source = "workflow" + // SourceInbox tags ObservedOps emitted by /observer/inbox — incoming + // real-world signals (email, SMS) that a coordinator would receive + // and act on. The handler only RECORDS the message; downstream + // triggers (e.g. matrix.search on the parsed demand) are the + // caller's concern, recorded separately. + SourceInbox Source = "inbox" ) // ObservedOp is one entry in the observer's ring buffer (and JSONL diff --git a/reports/reality-tests/multi_coord_stress_006.md b/reports/reality-tests/multi_coord_stress_006.md new file mode 100644 index 0000000..8da8736 --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_006.md @@ -0,0 +1,82 @@ +# Multi-Coordinator Stress Test — Run 006 + +**Generated:** 2026-04-30T13:33:24.568124731Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 67 +**Evidence:** `reports/reality-tests/multi_coord_stress_006.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0 | 9 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0.04603174603174603 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Verbatim handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (verbatim) | 4 | +| Alice's recorded answer in Bob's top-K (verbatim) | 4 | +| **Verbatim handover hit rate (top-1)** | **1** | +| Paraphrase handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 | +| Alice's recorded answer in Bob's top-K (paraphrase) | 4 | +| **Paraphrase handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit. +- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property. +- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_006.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_006.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_006.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index bf6dd1e..6816b2d 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -33,6 +33,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "strings" "time" ) @@ -304,6 +305,97 @@ func main() { output.Events = append(output.Events, ev) } + // ── Phase 1c: inbox burst (Hour 9) ────────────────────────── + // Mid-morning, 6 incoming signals arrive — emails + SMS — each + // carrying a structured demand for the system to act on. Events + // fire in PRIORITY ORDER (urgent → high → medium). For each, we: + // 1. POST to /v1/observer/inbox so the witness loop records it + // 2. Run matrix.search using the embedded demand + // 3. Capture both as events + // + // Priority weighting matters because real coordinators triage + // urgent client-side asks before medium-priority background + // signals. The substrate doesn't enforce ordering today (callers + // fire in their preferred order); this phase verifies the + // recording surface and the search-from-inbox flow work. + log.Printf("[stress] phase 1c: inbox burst (6 events, priority-ordered)") + type inboxEvent struct { + Priority string // "urgent" | "high" | "medium" | "low" + Type string // "email" | "sms" + Sender string + Subject string + Body string + // DemandQuery is the parsed demand we'd extract from the body. + // In production a small LLM would parse this; here we fix it + // up-front to keep the test deterministic. + DemandQuery string + Coord string + } + inboxEvents := []inboxEvent{ + { + Priority: "urgent", Type: "email", Sender: "ops@northstar.com", + Subject: "URGENT: 50 forklift operators Cleveland Monday", + Body: "Need 50 forklift operators in Cleveland OH for Monday day shift. OSHA-30 + active forklift cert required. Current Milwaukee batch cannot relocate.", + DemandQuery: "Forklift operator Cleveland OH OSHA-30 forklift certification day shift", + Coord: "alice", + }, + { + Priority: "urgent", Type: "email", Sender: "client@crossroads-mfg.com", + Subject: "URGENT: Production line down — need 30 production workers tonight", + Body: "Production line failure at Indianapolis IN site. Need 30 production workers swing shift starting tonight. Assembly + machine operation experience required.", + DemandQuery: "Production worker Indianapolis IN swing shift assembly machine operation", + Coord: "bob", + }, + { + Priority: "high", Type: "email", Sender: "supervisor@loop-construction.com", + Subject: "Need crane operator Chicago for 2-week project", + Body: "Crane operator with NCCCO certification needed for 2-week Chicago IL site project. Day shift. Mobile crane experience preferred.", + DemandQuery: "Crane operator NCCCO certification Chicago IL mobile crane day shift", + Coord: "carol", + }, + { + Priority: "high", Type: "sms", Sender: "+1-555-0142", + Body: "Bilingual safety coord needed Indy plant ASAP. Spanish + English. OSHA trainer credential.", + DemandQuery: "Bilingual Spanish English safety coordinator Indianapolis OSHA trainer", + Coord: "bob", + }, + { + Priority: "medium", Type: "sms", Sender: "+1-555-0188", + Body: "Drone surveyor for Chicago site progress mapping. FAA Part 107.", + DemandQuery: "FAA Part 107 drone surveyor Chicago site mapping", + Coord: "carol", + }, + { + Priority: "medium", Type: "email", Sender: "scheduling@northstar.com", + Subject: "FYI: warehouse worker capacity check Milwaukee", + Body: "Routine capacity check on Milwaukee warehouse worker pool — anyone with cold storage experience for next week?", + DemandQuery: "Warehouse worker Milwaukee cold storage", + Coord: "alice", + }, + } + // Sort by priority (urgent < high < medium < low for ordering). + prioRank := map[string]int{"urgent": 0, "high": 1, "medium": 2, "low": 3} + sort.SliceStable(inboxEvents, func(i, j int) bool { + return prioRank[inboxEvents[i].Priority] < prioRank[inboxEvents[j].Priority] + }) + for _, ie := range inboxEvents { + // 1. Record inbox event at observerd + if err := postInbox(hc, *gateway, ie.Type, ie.Sender, ie.Subject, ie.Body, ie.Priority, ie.Coord); err != nil { + log.Printf(" inbox record failed (%s): %v", ie.Priority, err) + continue + } + // 2. Triggered matrix.search using the parsed demand. + coord := coordByName(coords, ie.Coord) + resp, err := matrixSearch(hc, *gateway, ie.DemandQuery, corpora, *k, true, coord.PlaybookCorpus) + if err != nil { + log.Printf(" inbox-triggered search failed (%s): %v", ie.Priority, err) + continue + } + ev := captureEvent("inbox-triggered-search", 9, ie.Coord, "inbox-burst", ie.Subject, ie.DemandQuery, 1, true, coord.PlaybookCorpus, resp) + ev.Note = fmt.Sprintf("inbox %s/%s from %s", ie.Type, ie.Priority, ie.Sender) + output.Events = append(output.Events, ev) + } + // ── Phase 2: surge ────────────────────────────────────────── // Each coord's contract demand doubles. URGENT phrasing. log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)") @@ -838,6 +930,44 @@ func ingestFreshWorker(hc *http.Client, gw, id, text string, metadata map[string return nil } +// postInbox sends an inbox message (email or SMS) to observerd via +// the gateway. observerd records it as an ObservedOp with +// Source=SourceInbox; downstream actions (search, ingest, etc.) are +// the caller's concern. +func postInbox(hc *http.Client, gw, msgType, sender, subject, body, priority, tag string) error { + bodyJSON, _ := json.Marshal(map[string]any{ + "type": msgType, + "sender": sender, + "subject": subject, + "body": body, + "priority": priority, + "tag": tag, + }) + req, _ := http.NewRequest("POST", gw+"/v1/observer/inbox", bytes.NewReader(bodyJSON)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + rb, _ := io.ReadAll(resp.Body) + return fmt.Errorf("inbox %d: %s", resp.StatusCode, string(rb)) + } + return nil +} + +// coordByName looks up a coordinator by name. Used by inbox-triggered +// searches that route based on the email's tagged coordinator. +func coordByName(coords []Coordinator, name string) Coordinator { + for _, c := range coords { + if c.Name == name { + return c + } + } + return coords[0] +} + func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error { body, _ := json.Marshal(map[string]any{ "query_text": query,