Cross-lineage scrum on bundle 87cbd10..f971e64 (3,652 lines)
produced 4 actionable findings, all defensive hardening.
1. (Opus WARN) internal/langfuse/client.go:queue
Synchronous Flush at maxBatch threshold blocked the calling
goroutine for the full 5s HTTP timeout when Langfuse hiccupped,
defeating the "best-effort, never blocks calling path" contract
in the package doc. Now fire-and-forget via goroutine.
2. (Opus + Kimi convergent) cmd/observerd/main.go:handleInbox
- Free-form priority string was accepted; "nonsense" passed
through unchecked. Now closed enum: urgent|high|medium|low (+
empty defaults to medium). Tested: TestInbox_RejectsBadPriority.
- No size cap on body, only emptiness check; multi-MB payloads
would bloat observer's ring + JSONL. Now 8 KiB cap returns 413.
Tested: TestInbox_RejectsOversizedBody.
- Subject/sender/tag concatenated into InputSummary without
newline stripping; embedded \n could corrupt JSONL line-based
parsers. New sanitizeInboxField strips \r\n + caps at 256 chars
before interpolation.
3. (Opus INFO) scripts/multi_coord_stress/main.go
Removed dead `must[T]` generic — tracedSearch took over the
fail-fast role for matrix searches, so the helper became unused.
4. (Opus INFO) scripts/multi_coord_stress/main.go:Event
`JudgeRating int` collapsed "judge errored" and "judge said
unrated" both to 0. Changed to *int — nil = errored, 1-5 =
verdict. judgeInboxResult still returns 0 on error; caller
gates on > 0 before assigning.
Dismissed (with rationale):
- Opus WARN ExcludeIDs ordering: verified by code read — filter
applies after sort + before top-K truncation as documented;
no slot waste possible.
- Opus INFO 10 prior-run reports contradict #011: those are
point-in-time snapshots; intentional history.
- Kimi INFO Langfuse error suppression: design intent (best-effort
per package doc).
- Kimi INFO contract schema validation: defer until contract count
grows enough to make hand-edit drift a real risk.
- Kimi INFO paraphrase prompt duplicated across lift + multi_coord:
defer (lift to internal/paraphrase/ when a third consumer appears).
- Qwen HOLD: single-line, no actionable finding.
go test ./cmd/observerd ./internal/langfuse all green; multi_coord
driver builds clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
223 lines
5.9 KiB
Go
223 lines
5.9 KiB
Go
// Package langfuse is a minimal Go-side client for the Langfuse v2
|
|
// ingestion API. Mirrors the surface area we need from the Rust
|
|
// crates/gateway/src/v1/langfuse_trace.rs emitter — Trace + Span,
|
|
// nothing else yet (no scores, no observations, no datasets).
|
|
//
|
|
// Auth is Basic over public_key:secret_key. URL + creds come from
|
|
// /etc/lakehouse/langfuse.env in production; tests can pass any URL.
|
|
//
|
|
// Best-effort transport: errors are logged but don't fail the calling
|
|
// path. Lakehouse's internal services should never go down because
|
|
// Langfuse is unreachable.
|
|
package langfuse
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Client posts traces + spans to Langfuse's ingestion endpoint.
|
|
// Events are buffered and flushed in batches. Always call Flush
|
|
// before exit; Close also flushes.
|
|
type Client struct {
|
|
url string
|
|
auth string // pre-encoded "Basic ..."
|
|
hc *http.Client
|
|
mu sync.Mutex
|
|
pending []event
|
|
maxBatch int
|
|
}
|
|
|
|
// New constructs a Client. URL like "http://localhost:3001"; creds
|
|
// from langfuse.env. nil hc → uses default with 5s timeout.
|
|
func New(url, publicKey, secretKey string, hc *http.Client) *Client {
|
|
if hc == nil {
|
|
hc = &http.Client{Timeout: 5 * time.Second}
|
|
}
|
|
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(publicKey+":"+secretKey))
|
|
return &Client{
|
|
url: url,
|
|
auth: auth,
|
|
hc: hc,
|
|
maxBatch: 50,
|
|
}
|
|
}
|
|
|
|
// NewID returns a hex string suitable as a trace/span id. Langfuse
|
|
// accepts arbitrary strings; a 16-byte random hex is unambiguous.
|
|
func NewID() string {
|
|
b := make([]byte, 16)
|
|
_, _ = rand.Read(b)
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
// event is one Langfuse ingestion envelope. Body shape varies by
|
|
// type (trace-create vs span-create); we use map[string]any to
|
|
// keep the wire shape declarative.
|
|
type event struct {
|
|
ID string `json:"id"`
|
|
Type string `json:"type"` // "trace-create" | "span-create"
|
|
Timestamp string `json:"timestamp"`
|
|
Body map[string]any `json:"body"`
|
|
}
|
|
|
|
// TraceInput is what callers fill in when starting a trace.
|
|
type TraceInput struct {
|
|
Name string
|
|
UserID string
|
|
Input any
|
|
Metadata map[string]any
|
|
Tags []string
|
|
}
|
|
|
|
// Trace records a top-level trace. Returns the trace id so callers
|
|
// can attach spans. Best-effort: errors are logged and the trace
|
|
// id is still returned so callers don't need error-handling for the
|
|
// common case.
|
|
func (c *Client) Trace(ctx context.Context, t TraceInput) string {
|
|
id := NewID()
|
|
body := map[string]any{
|
|
"id": id,
|
|
"name": t.Name,
|
|
}
|
|
if t.UserID != "" {
|
|
body["userId"] = t.UserID
|
|
}
|
|
if t.Input != nil {
|
|
body["input"] = t.Input
|
|
}
|
|
if t.Metadata != nil {
|
|
body["metadata"] = t.Metadata
|
|
}
|
|
if len(t.Tags) > 0 {
|
|
body["tags"] = t.Tags
|
|
}
|
|
c.queue(event{
|
|
ID: NewID(),
|
|
Type: "trace-create",
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Body: body,
|
|
})
|
|
return id
|
|
}
|
|
|
|
// SpanInput is what callers fill in when recording a span.
|
|
type SpanInput struct {
|
|
TraceID string
|
|
ParentID string // optional — for nested spans
|
|
Name string
|
|
Input any
|
|
Output any
|
|
Metadata map[string]any
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
StatusCode int // 0 = success, anything else = error code
|
|
Level string // "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"
|
|
}
|
|
|
|
// Span records one span attached to a trace. Returns the span id.
|
|
func (c *Client) Span(ctx context.Context, s SpanInput) string {
|
|
id := NewID()
|
|
body := map[string]any{
|
|
"id": id,
|
|
"traceId": s.TraceID,
|
|
"name": s.Name,
|
|
}
|
|
if s.ParentID != "" {
|
|
body["parentObservationId"] = s.ParentID
|
|
}
|
|
if s.Input != nil {
|
|
body["input"] = s.Input
|
|
}
|
|
if s.Output != nil {
|
|
body["output"] = s.Output
|
|
}
|
|
if s.Metadata != nil {
|
|
body["metadata"] = s.Metadata
|
|
}
|
|
if !s.StartTime.IsZero() {
|
|
body["startTime"] = s.StartTime.UTC().Format(time.RFC3339Nano)
|
|
}
|
|
if !s.EndTime.IsZero() {
|
|
body["endTime"] = s.EndTime.UTC().Format(time.RFC3339Nano)
|
|
}
|
|
if s.Level != "" {
|
|
body["level"] = s.Level
|
|
}
|
|
if s.StatusCode != 0 {
|
|
body["statusMessage"] = fmt.Sprintf("status_code=%d", s.StatusCode)
|
|
}
|
|
c.queue(event{
|
|
ID: NewID(),
|
|
Type: "span-create",
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Body: body,
|
|
})
|
|
return id
|
|
}
|
|
|
|
func (c *Client) queue(e event) {
|
|
c.mu.Lock()
|
|
c.pending = append(c.pending, e)
|
|
shouldFlush := len(c.pending) >= c.maxBatch
|
|
c.mu.Unlock()
|
|
if shouldFlush {
|
|
// Fire-and-forget so the calling goroutine isn't blocked by
|
|
// the 5s HTTP timeout when Langfuse hiccups. Per scrum (Opus
|
|
// WARN): synchronous flush from queue defeated the
|
|
// "best-effort, never blocks calling path" guarantee in the
|
|
// package doc.
|
|
go func() { _ = c.Flush(context.Background()) }()
|
|
}
|
|
}
|
|
|
|
// Flush sends all queued events in one batch. Best-effort: returns
|
|
// the error but also logs; callers can ignore.
|
|
func (c *Client) Flush(ctx context.Context) error {
|
|
c.mu.Lock()
|
|
if len(c.pending) == 0 {
|
|
c.mu.Unlock()
|
|
return nil
|
|
}
|
|
batch := c.pending
|
|
c.pending = nil
|
|
c.mu.Unlock()
|
|
|
|
body, err := json.Marshal(map[string]any{"batch": batch})
|
|
if err != nil {
|
|
slog.Warn("langfuse: marshal batch", "err", err, "n", len(batch))
|
|
return err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/public/ingestion", bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Authorization", c.auth)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := c.hc.Do(req)
|
|
if err != nil {
|
|
slog.Warn("langfuse: post", "err", err, "n", len(batch))
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode/100 != 2 && resp.StatusCode != 207 {
|
|
slog.Warn("langfuse: non-2xx", "status", resp.StatusCode, "n", len(batch))
|
|
return fmt.Errorf("langfuse ingestion: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close flushes any remaining events. Idempotent.
|
|
func (c *Client) Close() error {
|
|
return c.Flush(context.Background())
|
|
}
|