scrum multi_coord_phase3: 4 fixes from cross-lineage review
Cross-lineage scrum on bundle 87cbd10..f971e64 (3,652 lines)
produced 4 actionable findings, all defensive hardening.
1. (Opus WARN) internal/langfuse/client.go:queue
Synchronous Flush at maxBatch threshold blocked the calling
goroutine for the full 5s HTTP timeout when Langfuse hiccupped,
defeating the "best-effort, never blocks calling path" contract
in the package doc. Now fire-and-forget via goroutine.
2. (Opus + Kimi convergent) cmd/observerd/main.go:handleInbox
- Free-form priority string was accepted; "nonsense" passed
through unchecked. Now closed enum: urgent|high|medium|low (+
empty defaults to medium). Tested: TestInbox_RejectsBadPriority.
- No size cap on body, only emptiness check; multi-MB payloads
would bloat observer's ring + JSONL. Now 8 KiB cap returns 413.
Tested: TestInbox_RejectsOversizedBody.
- Subject/sender/tag concatenated into InputSummary without
newline stripping; embedded \n could corrupt JSONL line-based
parsers. New sanitizeInboxField strips \r\n + caps at 256 chars
before interpolation.
3. (Opus INFO) scripts/multi_coord_stress/main.go
Removed dead `must[T]` generic — tracedSearch took over the
fail-fast role for matrix searches, so the helper became unused.
4. (Opus INFO) scripts/multi_coord_stress/main.go:Event
`JudgeRating int` collapsed "judge errored" and "judge said
unrated" both to 0. Changed to *int — nil = errored, 1-5 =
verdict. judgeInboxResult still returns 0 on error; caller
gates on > 0 before assigning.
Dismissed (with rationale):
- Opus WARN ExcludeIDs ordering: verified by code read — filter
applies after sort + before top-K truncation as documented;
no slot waste possible.
- Opus INFO 10 prior-run reports contradict #011: those are
point-in-time snapshots; intentional history.
- Kimi INFO Langfuse error suppression: design intent (best-effort
per package doc).
- Kimi INFO contract schema validation: defer until contract count
grows enough to make hand-edit drift a real risk.
- Kimi INFO paraphrase prompt duplicated across lift + multi_coord:
defer (lift to internal/paraphrase/ when a third consumer appears).
- Qwen HOLD: single-line, no actionable finding.
go test ./cmd/observerd ./internal/langfuse all green; multi_coord
driver builds clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f971e64745
commit
6c93a38093
@ -110,6 +110,41 @@ type inboxMessage struct {
|
||||
Tag string `json:"tag,omitempty"`
|
||||
}
|
||||
|
||||
// validInboxPriorities is the closed set of priority values. Per
|
||||
// scrum (Kimi WARN): a free-form priority string was accepted, so
|
||||
// downstream consumers parsing on exact enum could see "nonsense"
|
||||
// values. Now rejected at the boundary.
|
||||
var validInboxPriorities = map[string]bool{
|
||||
"": true, // empty defaults to medium below
|
||||
"urgent": true,
|
||||
"high": true,
|
||||
"medium": true,
|
||||
"low": true,
|
||||
}
|
||||
|
||||
// inboxMaxBodyChars caps the inbox body at a sane size so a large
|
||||
// payload doesn't bloat observer's ring buffer or JSONL log. Per
|
||||
// scrum (Opus WARN): only emptiness was checked, so a multi-MB body
|
||||
// would be accepted unconditionally.
|
||||
const inboxMaxBodyChars = 8 * 1024
|
||||
|
||||
// inboxMaxFieldChars caps subject/sender/tag at a much tighter size.
|
||||
// These appear inside the InputSummary string + JSONL log entry, so
|
||||
// long values bloat every record and embedded newlines corrupt
|
||||
// downstream line-based parsers.
|
||||
const inboxMaxFieldChars = 256
|
||||
|
||||
func sanitizeInboxField(s string) string {
|
||||
// Strip newlines so multi-line attacker-controlled strings don't
|
||||
// corrupt the JSONL log (one-line-per-op invariant).
|
||||
s = strings.ReplaceAll(s, "\n", " ")
|
||||
s = strings.ReplaceAll(s, "\r", " ")
|
||||
if len(s) > inboxMaxFieldChars {
|
||||
s = s[:inboxMaxFieldChars]
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (h *handlers) handleInbox(w http.ResponseWriter, r *http.Request) {
|
||||
var msg inboxMessage
|
||||
if !decodeJSON(w, r, &msg) {
|
||||
@ -123,12 +158,25 @@ func (h *handlers) handleInbox(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "body required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(msg.Body) > inboxMaxBodyChars {
|
||||
http.Error(w, fmt.Sprintf("body exceeds %d chars", inboxMaxBodyChars), http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
if !validInboxPriorities[msg.Priority] {
|
||||
http.Error(w, "priority must be urgent|high|medium|low", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if msg.Priority == "" {
|
||||
msg.Priority = "medium"
|
||||
}
|
||||
// Sanitize the fields that go into the InputSummary string so
|
||||
// newlines don't corrupt the JSONL log.
|
||||
sender := sanitizeInboxField(msg.Sender)
|
||||
subject := sanitizeInboxField(msg.Subject)
|
||||
tag := sanitizeInboxField(msg.Tag)
|
||||
op := observer.ObservedOp{
|
||||
Endpoint: "/observer/inbox/" + msg.Type,
|
||||
InputSummary: fmt.Sprintf("from=%s priority=%s tag=%s subject=%s", msg.Sender, msg.Priority, msg.Tag, msg.Subject),
|
||||
InputSummary: fmt.Sprintf("from=%s priority=%s tag=%s subject=%s", sender, msg.Priority, tag, subject),
|
||||
OutputSummary: msg.Body,
|
||||
Source: observer.SourceInbox,
|
||||
Success: true,
|
||||
|
||||
@ -212,6 +212,35 @@ func TestInbox_RejectsEmptyBody(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestInbox_RejectsBadPriority locks the priority enum (per
|
||||
// scrum Kimi WARN — free-form priority strings were accepted).
|
||||
func TestInbox_RejectsBadPriority(t *testing.T) {
|
||||
r := newTestRouter(t)
|
||||
body := []byte(`{"type":"email","sender":"x","body":"y","priority":"nonsense"}`)
|
||||
req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400 on bad priority, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInbox_RejectsOversizedBody locks the body-size cap (per scrum
|
||||
// Opus WARN — only emptiness was checked before).
|
||||
func TestInbox_RejectsOversizedBody(t *testing.T) {
|
||||
r := newTestRouter(t)
|
||||
bigBody := strings.Repeat("x", inboxMaxBodyChars+1)
|
||||
body := []byte(`{"type":"email","sender":"x","body":"` + bigBody + `","priority":"high"}`)
|
||||
req := httptest.NewRequest("POST", "/observer/inbox", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Errorf("expected 413 on oversized body, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions
|
||||
// that reference modes not registered with the runner. The harness's
|
||||
// reality test runs depend on this so an unknown-mode misconfiguration
|
||||
|
||||
@ -171,7 +171,12 @@ func (c *Client) queue(e event) {
|
||||
shouldFlush := len(c.pending) >= c.maxBatch
|
||||
c.mu.Unlock()
|
||||
if shouldFlush {
|
||||
_ = c.Flush(context.Background())
|
||||
// Fire-and-forget so the calling goroutine isn't blocked by
|
||||
// the 5s HTTP timeout when Langfuse hiccups. Per scrum (Opus
|
||||
// WARN): synchronous flush from queue defeated the
|
||||
// "best-effort, never blocks calling path" guarantee in the
|
||||
// package doc.
|
||||
go func() { _ = c.Flush(context.Background()) }()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -117,8 +117,10 @@ type Event struct {
|
||||
// original inbox body (not the LLM-parsed query). Lets us flag
|
||||
// the case where LLM parsing produces a tight-distance match
|
||||
// but the result doesn't actually fit the original ask.
|
||||
// 0 = unrated, 1-5 = judge verdict.
|
||||
JudgeRating int `json:"judge_rating,omitempty"`
|
||||
// nil = judge didn't run (or errored); 1-5 = judge verdict.
|
||||
// Pointer (per scrum Opus INFO) so "errored" and "unrated"
|
||||
// don't collide on the int zero value.
|
||||
JudgeRating *int `json:"judge_rating,omitempty"`
|
||||
Note string `json:"note,omitempty"`
|
||||
TimestampUnixNano int64 `json:"ts_ns"`
|
||||
}
|
||||
@ -532,7 +534,12 @@ func main() {
|
||||
if len(resp.Results) > 0 {
|
||||
judgeStart := time.Now()
|
||||
rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0])
|
||||
ev.JudgeRating = rating
|
||||
if rating > 0 {
|
||||
// Only set when we got an actual verdict (1-5); a 0
|
||||
// from judgeInboxResult means decode/HTTP error and
|
||||
// we want absent-from-JSON to flag that distinctly.
|
||||
ev.JudgeRating = &rating
|
||||
}
|
||||
emitSpan("llm.judge_top1", judgeStart,
|
||||
map[string]any{"original_body": ie.Body, "top1_id": resp.Results[0].ID, "top1_corpus": resp.Results[0].Corpus},
|
||||
map[string]any{"rating": rating}, "")
|
||||
@ -1389,9 +1396,3 @@ func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, s
|
||||
return nil
|
||||
}
|
||||
|
||||
func must[T any](v T, err error) T {
|
||||
if err != nil {
|
||||
log.Fatalf("[stress] %v", err)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user