root 7e6431e4fd langfuse: Go-side client + Phase 1c instrumentation
The Rust side has Langfuse tracing already (gateway/v1/langfuse_trace.rs);
this commit lands Go-side parity so the multi-coord stress harness can
emit traces visible at http://localhost:3001.

internal/langfuse/client.go:
- Minimal Trace + Span + Flush API mirroring what the Rust emitter
  uses. Auth: Basic over public_key:secret_key.
- Best-effort posture: errors are slog.Warn'd, never block calling
  paths. Same fail-open as observerd's persistor (ADR-005 Decision
  5.1) — observability is a witness, not a gate.
- Events buffered until 50, then auto-flushed; explicit Flush() at
  process exit.
- Each Trace/Span returns its id so callers can build hierarchies.

multi_coord_stress driver wiring:
- New --langfuse-env flag (default /etc/lakehouse/langfuse.env).
  Empty / missing / unparseable file → skip tracing with a logged
  warning; run still proceeds.
- Phase 1c (inbox burst) now emits one parent trace + 4 spans per
  inbox event:
    1. observerd.inbox.record  (post to /v1/observer/inbox)
    2. llm.parse_demand        (qwen2.5 → structured fields)
    3. matrix.search           (parsed query → top-K)
    4. llm.judge_top1          (rate top-1 vs original body)
  Each span carries input/output JSON + start/end times so the
  Langfuse UI shows a full waterfall per event.

Run #009 result:
  Trace landed: "multi_coord_stress phase 1c inbox burst"
  Observations attached: 24 (= 6 events × 4 spans)
  Tags: stress, phase-1c, inbox
  Browseable at http://localhost:3001 by tag query.

Other harness metrics: diversity 0.016, determinism 1.000,
verbatim handover 4/4, paraphrase handover 4/4 — all unchanged
by the tracing addition (best-effort post in parallel).

Phase 1c is the proof-of-concept; future commits can wrap other
phases (baseline / merge / handover / split) in traces too. Once
that's done, the entire stress run becomes scrubbable in Langfuse
without grepping the events JSON.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:25:03 -05:00

218 lines
5.7 KiB
Go

// Package langfuse is a minimal Go-side client for the Langfuse v2
// ingestion API. Mirrors the surface area we need from the Rust
// crates/gateway/src/v1/langfuse_trace.rs emitter — Trace + Span,
// nothing else yet (no scores, no observations, no datasets).
//
// Auth is Basic over public_key:secret_key. URL + creds come from
// /etc/lakehouse/langfuse.env in production; tests can pass any URL.
//
// Best-effort transport: errors are logged but don't fail the calling
// path. Lakehouse's internal services should never go down because
// Langfuse is unreachable.
package langfuse
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
)
// Client posts traces + spans to Langfuse's ingestion endpoint.
// Events are buffered and flushed in batches. Always call Flush
// before exit; Close also flushes.
type Client struct {
url string
auth string // pre-encoded "Basic ..."
hc *http.Client
mu sync.Mutex
pending []event
maxBatch int
}
// New constructs a Client. URL like "http://localhost:3001"; creds
// from langfuse.env. nil hc → uses default with 5s timeout.
func New(url, publicKey, secretKey string, hc *http.Client) *Client {
if hc == nil {
hc = &http.Client{Timeout: 5 * time.Second}
}
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(publicKey+":"+secretKey))
return &Client{
url: url,
auth: auth,
hc: hc,
maxBatch: 50,
}
}
// NewID returns a hex string suitable as a trace/span id. Langfuse
// accepts arbitrary strings; a 16-byte random hex is unambiguous.
func NewID() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
// event is one Langfuse ingestion envelope. Body shape varies by
// type (trace-create vs span-create); we use map[string]any to
// keep the wire shape declarative.
type event struct {
ID string `json:"id"`
Type string `json:"type"` // "trace-create" | "span-create"
Timestamp string `json:"timestamp"`
Body map[string]any `json:"body"`
}
// TraceInput is what callers fill in when starting a trace.
type TraceInput struct {
Name string
UserID string
Input any
Metadata map[string]any
Tags []string
}
// Trace records a top-level trace. Returns the trace id so callers
// can attach spans. Best-effort: errors are logged and the trace
// id is still returned so callers don't need error-handling for the
// common case.
func (c *Client) Trace(ctx context.Context, t TraceInput) string {
id := NewID()
body := map[string]any{
"id": id,
"name": t.Name,
}
if t.UserID != "" {
body["userId"] = t.UserID
}
if t.Input != nil {
body["input"] = t.Input
}
if t.Metadata != nil {
body["metadata"] = t.Metadata
}
if len(t.Tags) > 0 {
body["tags"] = t.Tags
}
c.queue(event{
ID: NewID(),
Type: "trace-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
// SpanInput is what callers fill in when recording a span.
type SpanInput struct {
TraceID string
ParentID string // optional — for nested spans
Name string
Input any
Output any
Metadata map[string]any
StartTime time.Time
EndTime time.Time
StatusCode int // 0 = success, anything else = error code
Level string // "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"
}
// Span records one span attached to a trace. Returns the span id.
func (c *Client) Span(ctx context.Context, s SpanInput) string {
id := NewID()
body := map[string]any{
"id": id,
"traceId": s.TraceID,
"name": s.Name,
}
if s.ParentID != "" {
body["parentObservationId"] = s.ParentID
}
if s.Input != nil {
body["input"] = s.Input
}
if s.Output != nil {
body["output"] = s.Output
}
if s.Metadata != nil {
body["metadata"] = s.Metadata
}
if !s.StartTime.IsZero() {
body["startTime"] = s.StartTime.UTC().Format(time.RFC3339Nano)
}
if !s.EndTime.IsZero() {
body["endTime"] = s.EndTime.UTC().Format(time.RFC3339Nano)
}
if s.Level != "" {
body["level"] = s.Level
}
if s.StatusCode != 0 {
body["statusMessage"] = fmt.Sprintf("status_code=%d", s.StatusCode)
}
c.queue(event{
ID: NewID(),
Type: "span-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
func (c *Client) queue(e event) {
c.mu.Lock()
c.pending = append(c.pending, e)
shouldFlush := len(c.pending) >= c.maxBatch
c.mu.Unlock()
if shouldFlush {
_ = c.Flush(context.Background())
}
}
// Flush sends all queued events in one batch. Best-effort: returns
// the error but also logs; callers can ignore.
func (c *Client) Flush(ctx context.Context) error {
c.mu.Lock()
if len(c.pending) == 0 {
c.mu.Unlock()
return nil
}
batch := c.pending
c.pending = nil
c.mu.Unlock()
body, err := json.Marshal(map[string]any{"batch": batch})
if err != nil {
slog.Warn("langfuse: marshal batch", "err", err, "n", len(batch))
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/public/ingestion", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", c.auth)
req.Header.Set("Content-Type", "application/json")
resp, err := c.hc.Do(req)
if err != nil {
slog.Warn("langfuse: post", "err", err, "n", len(batch))
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 && resp.StatusCode != 207 {
slog.Warn("langfuse: non-2xx", "status", resp.StatusCode, "n", len(batch))
return fmt.Errorf("langfuse ingestion: HTTP %d", resp.StatusCode)
}
return nil
}
// Close flushes any remaining events. Idempotent.
func (c *Client) Close() error {
return c.Flush(context.Background())
}