root 6c93a38093 scrum multi_coord_phase3: 4 fixes from cross-lineage review
Cross-lineage scrum on bundle 87cbd10..f971e64 (3,652 lines)
produced 4 actionable findings, all defensive hardening.

1. (Opus WARN) internal/langfuse/client.go:queue
   Synchronous Flush at maxBatch threshold blocked the calling
   goroutine for the full 5s HTTP timeout when Langfuse hiccupped,
   defeating the "best-effort, never blocks calling path" contract
   in the package doc. Now fire-and-forget via goroutine.

2. (Opus + Kimi convergent) cmd/observerd/main.go:handleInbox
   - Free-form priority string was accepted; "nonsense" passed
     through unchecked. Now closed enum: urgent|high|medium|low (+
     empty defaults to medium). Tested: TestInbox_RejectsBadPriority.
   - No size cap on body, only emptiness check; multi-MB payloads
     would bloat observer's ring + JSONL. Now 8 KiB cap returns 413.
     Tested: TestInbox_RejectsOversizedBody.
   - Subject/sender/tag concatenated into InputSummary without
     newline stripping; embedded \n could corrupt JSONL line-based
     parsers. New sanitizeInboxField strips \r\n + caps at 256 chars
     before interpolation.

3. (Opus INFO) scripts/multi_coord_stress/main.go
   Removed dead `must[T]` generic — tracedSearch took over the
   fail-fast role for matrix searches, so the helper became unused.

4. (Opus INFO) scripts/multi_coord_stress/main.go:Event
   `JudgeRating int` collapsed "judge errored" and "judge said
   unrated" both to 0. Changed to *int — nil = errored, 1-5 =
   verdict. judgeInboxResult still returns 0 on error; caller
   gates on > 0 before assigning.

Dismissed (with rationale):
- Opus WARN ExcludeIDs ordering: verified by code read — filter
  applies after sort + before top-K truncation as documented;
  no slot waste possible.
- Opus INFO 10 prior-run reports contradict #011: those are
  point-in-time snapshots; intentional history.
- Kimi INFO Langfuse error suppression: design intent (best-effort
  per package doc).
- Kimi INFO contract schema validation: defer until contract count
  grows enough to make hand-edit drift a real risk.
- Kimi INFO paraphrase prompt duplicated across lift + multi_coord:
  defer (lift to internal/paraphrase/ when a third consumer appears).
- Qwen HOLD: single-line, no actionable finding.

go test ./cmd/observerd ./internal/langfuse all green; multi_coord
driver builds clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 17:42:07 -05:00

223 lines
5.9 KiB
Go

// Package langfuse is a minimal Go-side client for the Langfuse v2
// ingestion API. Mirrors the surface area we need from the Rust
// crates/gateway/src/v1/langfuse_trace.rs emitter — Trace + Span,
// nothing else yet (no scores, no observations, no datasets).
//
// Auth is Basic over public_key:secret_key. URL + creds come from
// /etc/lakehouse/langfuse.env in production; tests can pass any URL.
//
// Best-effort transport: errors are logged but don't fail the calling
// path. Lakehouse's internal services should never go down because
// Langfuse is unreachable.
package langfuse
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
)
// Client posts traces + spans to Langfuse's ingestion endpoint.
// Events are buffered and flushed in batches. Always call Flush
// before exit; Close also flushes.
type Client struct {
url string
auth string // pre-encoded "Basic ..."
hc *http.Client
mu sync.Mutex
pending []event
maxBatch int
}
// New constructs a Client. URL like "http://localhost:3001"; creds
// from langfuse.env. nil hc → uses default with 5s timeout.
func New(url, publicKey, secretKey string, hc *http.Client) *Client {
if hc == nil {
hc = &http.Client{Timeout: 5 * time.Second}
}
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(publicKey+":"+secretKey))
return &Client{
url: url,
auth: auth,
hc: hc,
maxBatch: 50,
}
}
// NewID returns a hex string suitable as a trace/span id. Langfuse
// accepts arbitrary strings; a 16-byte random hex is unambiguous.
func NewID() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
// event is one Langfuse ingestion envelope. Body shape varies by
// type (trace-create vs span-create); we use map[string]any to
// keep the wire shape declarative.
type event struct {
ID string `json:"id"`
Type string `json:"type"` // "trace-create" | "span-create"
Timestamp string `json:"timestamp"`
Body map[string]any `json:"body"`
}
// TraceInput is what callers fill in when starting a trace.
type TraceInput struct {
Name string
UserID string
Input any
Metadata map[string]any
Tags []string
}
// Trace records a top-level trace. Returns the trace id so callers
// can attach spans. Best-effort: errors are logged and the trace
// id is still returned so callers don't need error-handling for the
// common case.
func (c *Client) Trace(ctx context.Context, t TraceInput) string {
id := NewID()
body := map[string]any{
"id": id,
"name": t.Name,
}
if t.UserID != "" {
body["userId"] = t.UserID
}
if t.Input != nil {
body["input"] = t.Input
}
if t.Metadata != nil {
body["metadata"] = t.Metadata
}
if len(t.Tags) > 0 {
body["tags"] = t.Tags
}
c.queue(event{
ID: NewID(),
Type: "trace-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
// SpanInput is what callers fill in when recording a span.
type SpanInput struct {
TraceID string
ParentID string // optional — for nested spans
Name string
Input any
Output any
Metadata map[string]any
StartTime time.Time
EndTime time.Time
StatusCode int // 0 = success, anything else = error code
Level string // "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"
}
// Span records one span attached to a trace. Returns the span id.
func (c *Client) Span(ctx context.Context, s SpanInput) string {
id := NewID()
body := map[string]any{
"id": id,
"traceId": s.TraceID,
"name": s.Name,
}
if s.ParentID != "" {
body["parentObservationId"] = s.ParentID
}
if s.Input != nil {
body["input"] = s.Input
}
if s.Output != nil {
body["output"] = s.Output
}
if s.Metadata != nil {
body["metadata"] = s.Metadata
}
if !s.StartTime.IsZero() {
body["startTime"] = s.StartTime.UTC().Format(time.RFC3339Nano)
}
if !s.EndTime.IsZero() {
body["endTime"] = s.EndTime.UTC().Format(time.RFC3339Nano)
}
if s.Level != "" {
body["level"] = s.Level
}
if s.StatusCode != 0 {
body["statusMessage"] = fmt.Sprintf("status_code=%d", s.StatusCode)
}
c.queue(event{
ID: NewID(),
Type: "span-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
func (c *Client) queue(e event) {
c.mu.Lock()
c.pending = append(c.pending, e)
shouldFlush := len(c.pending) >= c.maxBatch
c.mu.Unlock()
if shouldFlush {
// Fire-and-forget so the calling goroutine isn't blocked by
// the 5s HTTP timeout when Langfuse hiccups. Per scrum (Opus
// WARN): synchronous flush from queue defeated the
// "best-effort, never blocks calling path" guarantee in the
// package doc.
go func() { _ = c.Flush(context.Background()) }()
}
}
// Flush sends all queued events in one batch. Best-effort: returns
// the error but also logs; callers can ignore.
func (c *Client) Flush(ctx context.Context) error {
c.mu.Lock()
if len(c.pending) == 0 {
c.mu.Unlock()
return nil
}
batch := c.pending
c.pending = nil
c.mu.Unlock()
body, err := json.Marshal(map[string]any{"batch": batch})
if err != nil {
slog.Warn("langfuse: marshal batch", "err", err, "n", len(batch))
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/public/ingestion", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", c.auth)
req.Header.Set("Content-Type", "application/json")
resp, err := c.hc.Do(req)
if err != nil {
slog.Warn("langfuse: post", "err", err, "n", len(batch))
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 && resp.StatusCode != 207 {
slog.Warn("langfuse: non-2xx", "status", resp.StatusCode, "n", len(batch))
return fmt.Errorf("langfuse ingestion: HTTP %d", resp.StatusCode)
}
return nil
}
// Close flushes any remaining events. Idempotent.
func (c *Client) Close() error {
return c.Flush(context.Background())
}