langfuse: Go-side client + Phase 1c instrumentation

The Rust side has Langfuse tracing already (gateway/v1/langfuse_trace.rs);
this commit lands Go-side parity so the multi-coord stress harness can
emit traces visible at http://localhost:3001.

internal/langfuse/client.go:
- Minimal Trace + Span + Flush API mirroring what the Rust emitter
  uses. Auth: Basic over public_key:secret_key.
- Best-effort posture: errors are slog.Warn'd, never block calling
  paths. Same fail-open as observerd's persistor (ADR-005 Decision
  5.1) — observability is a witness, not a gate.
- Events buffered until 50, then auto-flushed; explicit Flush() at
  process exit.
- Each Trace/Span returns its id so callers can build hierarchies.

multi_coord_stress driver wiring:
- New --langfuse-env flag (default /etc/lakehouse/langfuse.env).
  Empty / missing / unparseable file → skip tracing with a logged
  warning; run still proceeds.
- Phase 1c (inbox burst) now emits one parent trace + 4 spans per
  inbox event:
    1. observerd.inbox.record  (post to /v1/observer/inbox)
    2. llm.parse_demand        (qwen2.5 → structured fields)
    3. matrix.search           (parsed query → top-K)
    4. llm.judge_top1          (rate top-1 vs original body)
  Each span carries input/output JSON + start/end times so the
  Langfuse UI shows a full waterfall per event.

Run #009 result:
  Trace landed: "multi_coord_stress phase 1c inbox burst"
  Observations attached: 24 (= 6 events × 4 spans)
  Tags: stress, phase-1c, inbox
  Browseable at http://localhost:3001 by tag query.

Other harness metrics: diversity 0.016, determinism 1.000,
verbatim handover 4/4, paraphrase handover 4/4 — all unchanged
by the tracing addition (best-effort post in parallel).

Phase 1c is the proof-of-concept; future commits can wrap other
phases (baseline / merge / handover / split) in traces too. Once
that's done, the entire stress run becomes scrubbable in Langfuse
without grepping the events JSON.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-30 16:25:03 -05:00
parent ce940f4a14
commit 7e6431e4fd
3 changed files with 469 additions and 11 deletions

217
internal/langfuse/client.go Normal file
View File

@ -0,0 +1,217 @@
// Package langfuse is a minimal Go-side client for the Langfuse v2
// ingestion API. Mirrors the surface area we need from the Rust
// crates/gateway/src/v1/langfuse_trace.rs emitter — Trace + Span,
// nothing else yet (no scores, no observations, no datasets).
//
// Auth is Basic over public_key:secret_key. URL + creds come from
// /etc/lakehouse/langfuse.env in production; tests can pass any URL.
//
// Best-effort transport: errors are logged but don't fail the calling
// path. Lakehouse's internal services should never go down because
// Langfuse is unreachable.
package langfuse
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
)
// Client posts traces + spans to Langfuse's ingestion endpoint.
// Events are buffered and flushed in batches. Always call Flush
// before exit; Close also flushes.
type Client struct {
url string
auth string // pre-encoded "Basic ..."
hc *http.Client
mu sync.Mutex
pending []event
maxBatch int
}
// New constructs a Client. URL like "http://localhost:3001"; creds
// from langfuse.env. nil hc → uses default with 5s timeout.
func New(url, publicKey, secretKey string, hc *http.Client) *Client {
if hc == nil {
hc = &http.Client{Timeout: 5 * time.Second}
}
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(publicKey+":"+secretKey))
return &Client{
url: url,
auth: auth,
hc: hc,
maxBatch: 50,
}
}
// NewID returns a hex string suitable as a trace/span id. Langfuse
// accepts arbitrary strings; a 16-byte random hex is unambiguous.
func NewID() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
// event is one Langfuse ingestion envelope. Body shape varies by
// type (trace-create vs span-create); we use map[string]any to
// keep the wire shape declarative.
type event struct {
ID string `json:"id"`
Type string `json:"type"` // "trace-create" | "span-create"
Timestamp string `json:"timestamp"`
Body map[string]any `json:"body"`
}
// TraceInput is what callers fill in when starting a trace.
type TraceInput struct {
Name string
UserID string
Input any
Metadata map[string]any
Tags []string
}
// Trace records a top-level trace. Returns the trace id so callers
// can attach spans. Best-effort: errors are logged and the trace
// id is still returned so callers don't need error-handling for the
// common case.
func (c *Client) Trace(ctx context.Context, t TraceInput) string {
id := NewID()
body := map[string]any{
"id": id,
"name": t.Name,
}
if t.UserID != "" {
body["userId"] = t.UserID
}
if t.Input != nil {
body["input"] = t.Input
}
if t.Metadata != nil {
body["metadata"] = t.Metadata
}
if len(t.Tags) > 0 {
body["tags"] = t.Tags
}
c.queue(event{
ID: NewID(),
Type: "trace-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
// SpanInput is what callers fill in when recording a span.
type SpanInput struct {
TraceID string
ParentID string // optional — for nested spans
Name string
Input any
Output any
Metadata map[string]any
StartTime time.Time
EndTime time.Time
StatusCode int // 0 = success, anything else = error code
Level string // "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"
}
// Span records one span attached to a trace. Returns the span id.
func (c *Client) Span(ctx context.Context, s SpanInput) string {
id := NewID()
body := map[string]any{
"id": id,
"traceId": s.TraceID,
"name": s.Name,
}
if s.ParentID != "" {
body["parentObservationId"] = s.ParentID
}
if s.Input != nil {
body["input"] = s.Input
}
if s.Output != nil {
body["output"] = s.Output
}
if s.Metadata != nil {
body["metadata"] = s.Metadata
}
if !s.StartTime.IsZero() {
body["startTime"] = s.StartTime.UTC().Format(time.RFC3339Nano)
}
if !s.EndTime.IsZero() {
body["endTime"] = s.EndTime.UTC().Format(time.RFC3339Nano)
}
if s.Level != "" {
body["level"] = s.Level
}
if s.StatusCode != 0 {
body["statusMessage"] = fmt.Sprintf("status_code=%d", s.StatusCode)
}
c.queue(event{
ID: NewID(),
Type: "span-create",
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
Body: body,
})
return id
}
func (c *Client) queue(e event) {
c.mu.Lock()
c.pending = append(c.pending, e)
shouldFlush := len(c.pending) >= c.maxBatch
c.mu.Unlock()
if shouldFlush {
_ = c.Flush(context.Background())
}
}
// Flush sends all queued events in one batch. Best-effort: returns
// the error but also logs; callers can ignore.
func (c *Client) Flush(ctx context.Context) error {
c.mu.Lock()
if len(c.pending) == 0 {
c.mu.Unlock()
return nil
}
batch := c.pending
c.pending = nil
c.mu.Unlock()
body, err := json.Marshal(map[string]any{"batch": batch})
if err != nil {
slog.Warn("langfuse: marshal batch", "err", err, "n", len(batch))
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/public/ingestion", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", c.auth)
req.Header.Set("Content-Type", "application/json")
resp, err := c.hc.Do(req)
if err != nil {
slog.Warn("langfuse: post", "err", err, "n", len(batch))
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 && resp.StatusCode != 207 {
slog.Warn("langfuse: non-2xx", "status", resp.StatusCode, "n", len(batch))
return fmt.Errorf("langfuse ingestion: HTTP %d", resp.StatusCode)
}
return nil
}
// Close flushes any remaining events. Idempotent.
func (c *Client) Close() error {
return c.Flush(context.Background())
}

View File

@ -0,0 +1,82 @@
# Multi-Coordinator Stress Test — Run 009
**Generated:** 2026-04-30T21:23:59.011167722Z
**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`)
**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction
**Corpora:** `workers,ethereal_workers`
**K per query:** 8
**Total events captured:** 67
**Evidence:** `reports/reality-tests/multi_coord_stress_009.json`
---
## Diversity — is the system locking into scenarios or cycling?
| Metric | Mean Jaccard | n pairs | Interpretation |
|---|---:|---:|---|
| Same role across different contracts | 0.015873015873015872 | 9 | Lower = more diverse (different region/cert mix → different workers) |
| Different roles within same contract | 0.015343915343915345 | 18 | Should be near-zero (different roles = different worker pools) |
**Healthy ranges:**
- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract.
- Different roles same contract: < 0.10 means role-specific retrieval is working.
- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent.
---
## Determinism — same query reissued, top-K stability
| Metric | Value |
|---|---:|
| Mean Jaccard on retrieval-only reissue | 1 |
| Number of reissue pairs | 12 |
**Interpretation:**
- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query."
- 0.80 0.95: Some HNSW or embed variance, acceptable.
- < 0.80: Retrieval is unstable reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall).
---
## Learning — handover hit rate
Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results?
| Metric | Value |
|---|---:|
| Verbatim handover queries run | 4 |
| Alice's recorded answer at Bob's top-1 (verbatim) | 4 |
| Alice's recorded answer in Bob's top-K (verbatim) | 4 |
| **Verbatim handover hit rate (top-1)** | **1** |
| Paraphrase handover queries run | 4 |
| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 |
| Alice's recorded answer in Bob's top-K (paraphrase) | 4 |
| **Paraphrase handover hit rate (top-1)** | **1** |
**Interpretation:**
- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit.
- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property.
- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass.
---
## Per-event capture
All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase:
```bash
jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_009.json
jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_009.json
jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_009.json
```
---
## What's NOT in this run (Phase 1 deliberately defers)
- **48-hour clock.** Events fire as discrete steps, not on a timeline.
- **Email / SMS ingest.** No endpoints exist on the Go side yet.
- **New-resume injection mid-run.** The corpus is fixed at the start.
- **Langfuse traces.** Need Go-side wiring.
These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of.

View File

@ -23,6 +23,7 @@
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
@ -36,6 +37,8 @@ import (
"sort"
"strings"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
)
// ── data shapes ──────────────────────────────────────────────────
@ -184,6 +187,7 @@ func main() {
ollama = flag.String("ollama", "http://127.0.0.1:11434", "Ollama base URL (only used if --with-paraphrase-handover)")
judgeModel = flag.String("judge", "qwen2.5:latest", "Ollama model for paraphrase generation (only used if --with-paraphrase-handover)")
withParaphraseHandover = flag.Bool("with-paraphrase-handover", false, "after the verbatim handover phase, run a paraphrase handover phase: Bob runs paraphrased versions of Alice's queries against Alice's playbook")
langfuseEnv = flag.String("langfuse-env", "/etc/lakehouse/langfuse.env", "path to Langfuse credentials env file (empty = skip tracing)")
)
flag.Parse()
@ -216,6 +220,24 @@ func main() {
ctx := context.Background()
_ = ctx
// Optional Langfuse client. Best-effort: missing env file or
// unreachable Langfuse just means traces don't go anywhere; the
// run still proceeds.
var lf *langfuse.Client
if *langfuseEnv != "" {
if creds, err := loadLangfuseEnv(*langfuseEnv); err == nil {
lf = langfuse.New(creds.URL, creds.PublicKey, creds.SecretKey, nil)
log.Printf("[stress] Langfuse client live → %s", creds.URL)
defer func() {
if err := lf.Flush(context.Background()); err != nil {
log.Printf("[stress] Langfuse final flush: %v", err)
}
}()
} else {
log.Printf("[stress] Langfuse skipped: %v", err)
}
}
output := Output{
Coordinators: []string{"alice", "bob", "carol"},
Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name},
@ -325,6 +347,19 @@ func main() {
// fire in their preferred order); this phase verifies the
// recording surface and the search-from-inbox flow work.
log.Printf("[stress] phase 1c: inbox burst (6 events, priority-ordered)")
var inboxTraceID string
if lf != nil {
inboxTraceID = lf.Trace(ctx, langfuse.TraceInput{
Name: "multi_coord_stress phase 1c inbox burst",
Tags: []string{"stress", "inbox", "phase-1c"},
Metadata: map[string]any{
"hour": 9,
"corpora": corpora,
"k": *k,
"event_count": 6,
},
})
}
type inboxEvent struct {
Priority string // "urgent" | "high" | "medium" | "low"
Type string // "email" | "sms"
@ -376,40 +411,110 @@ func main() {
})
for _, ie := range inboxEvents {
// 1. Record inbox event at observerd
stepStart := time.Now()
if err := postInbox(hc, *gateway, ie.Type, ie.Sender, ie.Subject, ie.Body, ie.Priority, ie.Coord); err != nil {
log.Printf(" inbox record failed (%s): %v", ie.Priority, err)
continue
}
// 2. LLM parses the body into a structured demand. Real
// production: a small local model extracts {role, count,
// location, certs, skills, shift} from email/SMS bodies
// instead of a coordinator typing it into a form. Test
// captures both raw body and parsed structure for review.
if lf != nil && inboxTraceID != "" {
lf.Span(ctx, langfuse.SpanInput{
TraceID: inboxTraceID,
Name: "observerd.inbox.record",
Input: map[string]any{"type": ie.Type, "sender": ie.Sender, "priority": ie.Priority, "subject": ie.Subject, "body_chars": len(ie.Body)},
Output: map[string]any{"accepted": true},
StartTime: stepStart,
EndTime: time.Now(),
Metadata: map[string]any{"coordinator": ie.Coord},
})
}
// 2. LLM parses the body into a structured demand.
parseStart := time.Now()
parsed, perr := parseInboxDemand(hc, *ollama, *judgeModel, ie.Body)
parseEnd := time.Now()
if perr != nil {
if lf != nil && inboxTraceID != "" {
lf.Span(ctx, langfuse.SpanInput{
TraceID: inboxTraceID,
Name: "llm.parse_demand",
Input: map[string]any{"body": ie.Body, "model": *judgeModel},
Output: map[string]any{"error": perr.Error()},
StartTime: parseStart,
EndTime: parseEnd,
Level: "ERROR",
})
}
log.Printf(" inbox demand parse failed (%s): %v", ie.Priority, perr)
continue
}
if lf != nil && inboxTraceID != "" {
lf.Span(ctx, langfuse.SpanInput{
TraceID: inboxTraceID,
Name: "llm.parse_demand",
Input: map[string]any{"body": ie.Body, "model": *judgeModel},
Output: parsed,
StartTime: parseStart,
EndTime: parseEnd,
})
}
// 3. Build a query string from the parsed demand and search.
query := parsed.AsQuery()
coord := coordByName(coords, ie.Coord)
searchStart := time.Now()
resp, err := matrixSearch(hc, *gateway, query, corpora, *k, true, coord.PlaybookCorpus)
searchEnd := time.Now()
if err != nil {
log.Printf(" inbox-triggered search failed (%s): %v", ie.Priority, err)
continue
}
if lf != nil && inboxTraceID != "" {
topIDs := make([]string, 0, len(resp.Results))
for _, r := range resp.Results {
topIDs = append(topIDs, r.ID)
}
lf.Span(ctx, langfuse.SpanInput{
TraceID: inboxTraceID,
Name: "matrix.search",
Input: map[string]any{
"query": query,
"corpora": corpora,
"k": *k,
"playbook_corpus": coord.PlaybookCorpus,
},
Output: map[string]any{
"top_k_ids": topIDs,
"top1_distance": firstDistance(resp.Results),
"playbook_boosted": resp.PlaybookBoosted,
"playbook_injected": resp.PlaybookInjected,
},
StartTime: searchStart,
EndTime: searchEnd,
})
}
ev := captureEvent("inbox-triggered-search", 9, ie.Coord, "inbox-burst", ie.Subject, query, 1, true, coord.PlaybookCorpus, resp)
parsedJSON, _ := json.Marshal(parsed)
ev.Note = fmt.Sprintf("inbox %s/%s from %s · LLM-parsed demand: %s", ie.Type, ie.Priority, ie.Sender, string(parsedJSON))
// 4. Judge re-rates top-1 against the ORIGINAL body — not the
// parsed query. Catches the case where parsing dropped a
// constraint (or where the corpus has no real match for the
// asked specialist, e.g. "drone surveyor" against a corpus
// of warehouse workers — the closest semantic neighbor will
// have a tight distance but not actually fit).
// 4. Judge re-rates top-1 against the ORIGINAL body.
if len(resp.Results) > 0 {
judgeStart := time.Now()
rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0])
ev.JudgeRating = rating
if lf != nil && inboxTraceID != "" {
lf.Span(ctx, langfuse.SpanInput{
TraceID: inboxTraceID,
Name: "llm.judge_top1",
Input: map[string]any{
"original_body": ie.Body,
"top1_id": resp.Results[0].ID,
"top1_corpus": resp.Results[0].Corpus,
},
Output: map[string]any{"rating": rating},
StartTime: judgeStart,
EndTime: time.Now(),
})
}
}
output.Events = append(output.Events, ev)
}
@ -948,6 +1053,60 @@ func ingestFreshWorker(hc *http.Client, gw, id, text string, metadata map[string
return nil
}
type langfuseEnvCreds struct {
URL string
PublicKey string
SecretKey string
}
// loadLangfuseEnv parses a key=value env file (one assignment per
// non-comment line) and pulls LANGFUSE_URL, LANGFUSE_PUBLIC_KEY,
// LANGFUSE_SECRET_KEY. All three required; missing any returns an
// error so callers can skip-with-warning rather than silently
// run without tracing.
func loadLangfuseEnv(path string) (*langfuseEnvCreds, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
creds := &langfuseEnvCreds{}
sc := bufio.NewScanner(f)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" || strings.HasPrefix(line, "#") {
continue
}
eq := strings.IndexByte(line, '=')
if eq < 0 {
continue
}
k, v := strings.TrimSpace(line[:eq]), strings.TrimSpace(line[eq+1:])
switch k {
case "LANGFUSE_URL":
creds.URL = v
case "LANGFUSE_PUBLIC_KEY":
creds.PublicKey = v
case "LANGFUSE_SECRET_KEY":
creds.SecretKey = v
}
}
if err := sc.Err(); err != nil {
return nil, err
}
if creds.URL == "" || creds.PublicKey == "" || creds.SecretKey == "" {
return nil, fmt.Errorf("langfuse env missing one of URL/PUBLIC_KEY/SECRET_KEY")
}
return creds, nil
}
func firstDistance(results []matrixResult) float32 {
if len(results) == 0 {
return 0
}
return results[0].Distance
}
// parsedDemand is the LLM-extracted structure from an inbox message
// body — what a real coordinator would type into a search form.
// Empty fields are honest: the body didn't say.