From 6c93a38093b89675a547c8bf44afe257325fa24f Mon Sep 17 00:00:00 2001 From: root Date: Thu, 30 Apr 2026 17:42:07 -0500 Subject: [PATCH] scrum multi_coord_phase3: 4 fixes from cross-lineage review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cross-lineage scrum on bundle 87cbd10..f971e64 (3,652 lines) produced 4 actionable findings, all defensive hardening. 1. (Opus WARN) internal/langfuse/client.go:queue Synchronous Flush at maxBatch threshold blocked the calling goroutine for the full 5s HTTP timeout when Langfuse hiccupped, defeating the "best-effort, never blocks calling path" contract in the package doc. Now fire-and-forget via goroutine. 2. (Opus + Kimi convergent) cmd/observerd/main.go:handleInbox - Free-form priority string was accepted; "nonsense" passed through unchecked. Now closed enum: urgent|high|medium|low (+ empty defaults to medium). Tested: TestInbox_RejectsBadPriority. - No size cap on body, only emptiness check; multi-MB payloads would bloat observer's ring + JSONL. Now 8 KiB cap returns 413. Tested: TestInbox_RejectsOversizedBody. - Subject/sender/tag concatenated into InputSummary without newline stripping; embedded \n could corrupt JSONL line-based parsers. New sanitizeInboxField strips \r\n + caps at 256 chars before interpolation. 3. (Opus INFO) scripts/multi_coord_stress/main.go Removed dead `must[T]` generic — tracedSearch took over the fail-fast role for matrix searches, so the helper became unused. 4. (Opus INFO) scripts/multi_coord_stress/main.go:Event `JudgeRating int` collapsed "judge errored" and "judge said unrated" both to 0. Changed to *int — nil = errored, 1-5 = verdict. judgeInboxResult still returns 0 on error; caller gates on > 0 before assigning. Dismissed (with rationale): - Opus WARN ExcludeIDs ordering: verified by code read — filter applies after sort + before top-K truncation as documented; no slot waste possible. - Opus INFO 10 prior-run reports contradict #011: those are point-in-time snapshots; intentional history. - Kimi INFO Langfuse error suppression: design intent (best-effort per package doc). - Kimi INFO contract schema validation: defer until contract count grows enough to make hand-edit drift a real risk. - Kimi INFO paraphrase prompt duplicated across lift + multi_coord: defer (lift to internal/paraphrase/ when a third consumer appears). - Qwen HOLD: single-line, no actionable finding. go test ./cmd/observerd ./internal/langfuse all green; multi_coord driver builds clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/observerd/main.go | 50 +++++++++++++++++++++++++++++- cmd/observerd/main_test.go | 29 +++++++++++++++++ internal/langfuse/client.go | 7 ++++- scripts/multi_coord_stress/main.go | 19 ++++++------ 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/cmd/observerd/main.go b/cmd/observerd/main.go index 5f32d73..9d4e911 100644 --- a/cmd/observerd/main.go +++ b/cmd/observerd/main.go @@ -110,6 +110,41 @@ type inboxMessage struct { Tag string `json:"tag,omitempty"` } +// validInboxPriorities is the closed set of priority values. Per +// scrum (Kimi WARN): a free-form priority string was accepted, so +// downstream consumers parsing on exact enum could see "nonsense" +// values. Now rejected at the boundary. +var validInboxPriorities = map[string]bool{ + "": true, // empty defaults to medium below + "urgent": true, + "high": true, + "medium": true, + "low": true, +} + +// inboxMaxBodyChars caps the inbox body at a sane size so a large +// payload doesn't bloat observer's ring buffer or JSONL log. Per +// scrum (Opus WARN): only emptiness was checked, so a multi-MB body +// would be accepted unconditionally. +const inboxMaxBodyChars = 8 * 1024 + +// inboxMaxFieldChars caps subject/sender/tag at a much tighter size. +// These appear inside the InputSummary string + JSONL log entry, so +// long values bloat every record and embedded newlines corrupt +// downstream line-based parsers. +const inboxMaxFieldChars = 256 + +func sanitizeInboxField(s string) string { + // Strip newlines so multi-line attacker-controlled strings don't + // corrupt the JSONL log (one-line-per-op invariant). + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + if len(s) > inboxMaxFieldChars { + s = s[:inboxMaxFieldChars] + } + return s +} + func (h *handlers) handleInbox(w http.ResponseWriter, r *http.Request) { var msg inboxMessage if !decodeJSON(w, r, &msg) { @@ -123,12 +158,25 @@ func (h *handlers) handleInbox(w http.ResponseWriter, r *http.Request) { http.Error(w, "body required", http.StatusBadRequest) return } + if len(msg.Body) > inboxMaxBodyChars { + http.Error(w, fmt.Sprintf("body exceeds %d chars", inboxMaxBodyChars), http.StatusRequestEntityTooLarge) + return + } + if !validInboxPriorities[msg.Priority] { + http.Error(w, "priority must be urgent|high|medium|low", http.StatusBadRequest) + return + } if msg.Priority == "" { msg.Priority = "medium" } + // Sanitize the fields that go into the InputSummary string so + // newlines don't corrupt the JSONL log. + sender := sanitizeInboxField(msg.Sender) + subject := sanitizeInboxField(msg.Subject) + tag := sanitizeInboxField(msg.Tag) op := observer.ObservedOp{ Endpoint: "/observer/inbox/" + msg.Type, - InputSummary: fmt.Sprintf("from=%s priority=%s tag=%s subject=%s", msg.Sender, msg.Priority, msg.Tag, msg.Subject), + InputSummary: fmt.Sprintf("from=%s priority=%s tag=%s subject=%s", sender, msg.Priority, tag, subject), OutputSummary: msg.Body, Source: observer.SourceInbox, Success: true, diff --git a/cmd/observerd/main_test.go b/cmd/observerd/main_test.go index 9609de2..f08ce5b 100644 --- a/cmd/observerd/main_test.go +++ b/cmd/observerd/main_test.go @@ -212,6 +212,35 @@ func TestInbox_RejectsEmptyBody(t *testing.T) { } } +// TestInbox_RejectsBadPriority locks the priority enum (per +// scrum Kimi WARN — free-form priority strings were accepted). +func TestInbox_RejectsBadPriority(t *testing.T) { + r := newTestRouter(t) + body := []byte(`{"type":"email","sender":"x","body":"y","priority":"nonsense"}`) + req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 on bad priority, got %d", w.Code) + } +} + +// TestInbox_RejectsOversizedBody locks the body-size cap (per scrum +// Opus WARN — only emptiness was checked before). +func TestInbox_RejectsOversizedBody(t *testing.T) { + r := newTestRouter(t) + bigBody := strings.Repeat("x", inboxMaxBodyChars+1) + body := []byte(`{"type":"email","sender":"x","body":"` + bigBody + `","priority":"high"}`) + req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusRequestEntityTooLarge { + t.Errorf("expected 413 on oversized body, got %d", w.Code) + } +} + // TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions // that reference modes not registered with the runner. The harness's // reality test runs depend on this so an unknown-mode misconfiguration diff --git a/internal/langfuse/client.go b/internal/langfuse/client.go index faef4ad..39ba61a 100644 --- a/internal/langfuse/client.go +++ b/internal/langfuse/client.go @@ -171,7 +171,12 @@ func (c *Client) queue(e event) { shouldFlush := len(c.pending) >= c.maxBatch c.mu.Unlock() if shouldFlush { - _ = c.Flush(context.Background()) + // Fire-and-forget so the calling goroutine isn't blocked by + // the 5s HTTP timeout when Langfuse hiccups. Per scrum (Opus + // WARN): synchronous flush from queue defeated the + // "best-effort, never blocks calling path" guarantee in the + // package doc. + go func() { _ = c.Flush(context.Background()) }() } } diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index 73f2f49..a88ee55 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -117,8 +117,10 @@ type Event struct { // original inbox body (not the LLM-parsed query). Lets us flag // the case where LLM parsing produces a tight-distance match // but the result doesn't actually fit the original ask. - // 0 = unrated, 1-5 = judge verdict. - JudgeRating int `json:"judge_rating,omitempty"` + // nil = judge didn't run (or errored); 1-5 = judge verdict. + // Pointer (per scrum Opus INFO) so "errored" and "unrated" + // don't collide on the int zero value. + JudgeRating *int `json:"judge_rating,omitempty"` Note string `json:"note,omitempty"` TimestampUnixNano int64 `json:"ts_ns"` } @@ -532,7 +534,12 @@ func main() { if len(resp.Results) > 0 { judgeStart := time.Now() rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0]) - ev.JudgeRating = rating + if rating > 0 { + // Only set when we got an actual verdict (1-5); a 0 + // from judgeInboxResult means decode/HTTP error and + // we want absent-from-JSON to flag that distinctly. + ev.JudgeRating = &rating + } emitSpan("llm.judge_top1", judgeStart, map[string]any{"original_body": ie.Body, "top1_id": resp.Results[0].ID, "top1_corpus": resp.Results[0].Corpus}, map[string]any{"rating": rating}, "") @@ -1389,9 +1396,3 @@ func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, s return nil } -func must[T any](v T, err error) T { - if err != nil { - log.Fatalf("[stress] %v", err) - } - return v -}