diff --git a/internal/langfuse/client.go b/internal/langfuse/client.go new file mode 100644 index 0000000..faef4ad --- /dev/null +++ b/internal/langfuse/client.go @@ -0,0 +1,217 @@ +// Package langfuse is a minimal Go-side client for the Langfuse v2 +// ingestion API. Mirrors the surface area we need from the Rust +// crates/gateway/src/v1/langfuse_trace.rs emitter — Trace + Span, +// nothing else yet (no scores, no observations, no datasets). +// +// Auth is Basic over public_key:secret_key. URL + creds come from +// /etc/lakehouse/langfuse.env in production; tests can pass any URL. +// +// Best-effort transport: errors are logged but don't fail the calling +// path. Lakehouse's internal services should never go down because +// Langfuse is unreachable. +package langfuse + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync" + "time" +) + +// Client posts traces + spans to Langfuse's ingestion endpoint. +// Events are buffered and flushed in batches. Always call Flush +// before exit; Close also flushes. +type Client struct { + url string + auth string // pre-encoded "Basic ..." + hc *http.Client + mu sync.Mutex + pending []event + maxBatch int +} + +// New constructs a Client. URL like "http://localhost:3001"; creds +// from langfuse.env. nil hc → uses default with 5s timeout. +func New(url, publicKey, secretKey string, hc *http.Client) *Client { + if hc == nil { + hc = &http.Client{Timeout: 5 * time.Second} + } + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(publicKey+":"+secretKey)) + return &Client{ + url: url, + auth: auth, + hc: hc, + maxBatch: 50, + } +} + +// NewID returns a hex string suitable as a trace/span id. Langfuse +// accepts arbitrary strings; a 16-byte random hex is unambiguous. +func NewID() string { + b := make([]byte, 16) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} + +// event is one Langfuse ingestion envelope. Body shape varies by +// type (trace-create vs span-create); we use map[string]any to +// keep the wire shape declarative. +type event struct { + ID string `json:"id"` + Type string `json:"type"` // "trace-create" | "span-create" + Timestamp string `json:"timestamp"` + Body map[string]any `json:"body"` +} + +// TraceInput is what callers fill in when starting a trace. +type TraceInput struct { + Name string + UserID string + Input any + Metadata map[string]any + Tags []string +} + +// Trace records a top-level trace. Returns the trace id so callers +// can attach spans. Best-effort: errors are logged and the trace +// id is still returned so callers don't need error-handling for the +// common case. +func (c *Client) Trace(ctx context.Context, t TraceInput) string { + id := NewID() + body := map[string]any{ + "id": id, + "name": t.Name, + } + if t.UserID != "" { + body["userId"] = t.UserID + } + if t.Input != nil { + body["input"] = t.Input + } + if t.Metadata != nil { + body["metadata"] = t.Metadata + } + if len(t.Tags) > 0 { + body["tags"] = t.Tags + } + c.queue(event{ + ID: NewID(), + Type: "trace-create", + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + Body: body, + }) + return id +} + +// SpanInput is what callers fill in when recording a span. +type SpanInput struct { + TraceID string + ParentID string // optional — for nested spans + Name string + Input any + Output any + Metadata map[string]any + StartTime time.Time + EndTime time.Time + StatusCode int // 0 = success, anything else = error code + Level string // "DEBUG" | "DEFAULT" | "WARNING" | "ERROR" +} + +// Span records one span attached to a trace. Returns the span id. +func (c *Client) Span(ctx context.Context, s SpanInput) string { + id := NewID() + body := map[string]any{ + "id": id, + "traceId": s.TraceID, + "name": s.Name, + } + if s.ParentID != "" { + body["parentObservationId"] = s.ParentID + } + if s.Input != nil { + body["input"] = s.Input + } + if s.Output != nil { + body["output"] = s.Output + } + if s.Metadata != nil { + body["metadata"] = s.Metadata + } + if !s.StartTime.IsZero() { + body["startTime"] = s.StartTime.UTC().Format(time.RFC3339Nano) + } + if !s.EndTime.IsZero() { + body["endTime"] = s.EndTime.UTC().Format(time.RFC3339Nano) + } + if s.Level != "" { + body["level"] = s.Level + } + if s.StatusCode != 0 { + body["statusMessage"] = fmt.Sprintf("status_code=%d", s.StatusCode) + } + c.queue(event{ + ID: NewID(), + Type: "span-create", + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + Body: body, + }) + return id +} + +func (c *Client) queue(e event) { + c.mu.Lock() + c.pending = append(c.pending, e) + shouldFlush := len(c.pending) >= c.maxBatch + c.mu.Unlock() + if shouldFlush { + _ = c.Flush(context.Background()) + } +} + +// Flush sends all queued events in one batch. Best-effort: returns +// the error but also logs; callers can ignore. +func (c *Client) Flush(ctx context.Context) error { + c.mu.Lock() + if len(c.pending) == 0 { + c.mu.Unlock() + return nil + } + batch := c.pending + c.pending = nil + c.mu.Unlock() + + body, err := json.Marshal(map[string]any{"batch": batch}) + if err != nil { + slog.Warn("langfuse: marshal batch", "err", err, "n", len(batch)) + return err + } + req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/public/ingestion", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Authorization", c.auth) + req.Header.Set("Content-Type", "application/json") + resp, err := c.hc.Do(req) + if err != nil { + slog.Warn("langfuse: post", "err", err, "n", len(batch)) + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 && resp.StatusCode != 207 { + slog.Warn("langfuse: non-2xx", "status", resp.StatusCode, "n", len(batch)) + return fmt.Errorf("langfuse ingestion: HTTP %d", resp.StatusCode) + } + return nil +} + +// Close flushes any remaining events. Idempotent. +func (c *Client) Close() error { + return c.Flush(context.Background()) +} diff --git a/reports/reality-tests/multi_coord_stress_009.md b/reports/reality-tests/multi_coord_stress_009.md new file mode 100644 index 0000000..2637c41 --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_009.md @@ -0,0 +1,82 @@ +# Multi-Coordinator Stress Test — Run 009 + +**Generated:** 2026-04-30T21:23:59.011167722Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 67 +**Evidence:** `reports/reality-tests/multi_coord_stress_009.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0.015873015873015872 | 9 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0.015343915343915345 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Verbatim handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (verbatim) | 4 | +| Alice's recorded answer in Bob's top-K (verbatim) | 4 | +| **Verbatim handover hit rate (top-1)** | **1** | +| Paraphrase handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 | +| Alice's recorded answer in Bob's top-K (paraphrase) | 4 | +| **Paraphrase handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit. +- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property. +- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_009.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_009.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_009.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index 4d9f509..e0dba2a 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -23,6 +23,7 @@ package main import ( + "bufio" "bytes" "context" "encoding/json" @@ -36,6 +37,8 @@ import ( "sort" "strings" "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse" ) // ── data shapes ────────────────────────────────────────────────── @@ -184,6 +187,7 @@ func main() { ollama = flag.String("ollama", "http://127.0.0.1:11434", "Ollama base URL (only used if --with-paraphrase-handover)") judgeModel = flag.String("judge", "qwen2.5:latest", "Ollama model for paraphrase generation (only used if --with-paraphrase-handover)") withParaphraseHandover = flag.Bool("with-paraphrase-handover", false, "after the verbatim handover phase, run a paraphrase handover phase: Bob runs paraphrased versions of Alice's queries against Alice's playbook") + langfuseEnv = flag.String("langfuse-env", "/etc/lakehouse/langfuse.env", "path to Langfuse credentials env file (empty = skip tracing)") ) flag.Parse() @@ -216,6 +220,24 @@ func main() { ctx := context.Background() _ = ctx + // Optional Langfuse client. Best-effort: missing env file or + // unreachable Langfuse just means traces don't go anywhere; the + // run still proceeds. + var lf *langfuse.Client + if *langfuseEnv != "" { + if creds, err := loadLangfuseEnv(*langfuseEnv); err == nil { + lf = langfuse.New(creds.URL, creds.PublicKey, creds.SecretKey, nil) + log.Printf("[stress] Langfuse client live → %s", creds.URL) + defer func() { + if err := lf.Flush(context.Background()); err != nil { + log.Printf("[stress] Langfuse final flush: %v", err) + } + }() + } else { + log.Printf("[stress] Langfuse skipped: %v", err) + } + } + output := Output{ Coordinators: []string{"alice", "bob", "carol"}, Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name}, @@ -325,6 +347,19 @@ func main() { // fire in their preferred order); this phase verifies the // recording surface and the search-from-inbox flow work. log.Printf("[stress] phase 1c: inbox burst (6 events, priority-ordered)") + var inboxTraceID string + if lf != nil { + inboxTraceID = lf.Trace(ctx, langfuse.TraceInput{ + Name: "multi_coord_stress phase 1c inbox burst", + Tags: []string{"stress", "inbox", "phase-1c"}, + Metadata: map[string]any{ + "hour": 9, + "corpora": corpora, + "k": *k, + "event_count": 6, + }, + }) + } type inboxEvent struct { Priority string // "urgent" | "high" | "medium" | "low" Type string // "email" | "sms" @@ -376,40 +411,110 @@ func main() { }) for _, ie := range inboxEvents { // 1. Record inbox event at observerd + stepStart := time.Now() if err := postInbox(hc, *gateway, ie.Type, ie.Sender, ie.Subject, ie.Body, ie.Priority, ie.Coord); err != nil { log.Printf(" inbox record failed (%s): %v", ie.Priority, err) continue } - // 2. LLM parses the body into a structured demand. Real - // production: a small local model extracts {role, count, - // location, certs, skills, shift} from email/SMS bodies - // instead of a coordinator typing it into a form. Test - // captures both raw body and parsed structure for review. + if lf != nil && inboxTraceID != "" { + lf.Span(ctx, langfuse.SpanInput{ + TraceID: inboxTraceID, + Name: "observerd.inbox.record", + Input: map[string]any{"type": ie.Type, "sender": ie.Sender, "priority": ie.Priority, "subject": ie.Subject, "body_chars": len(ie.Body)}, + Output: map[string]any{"accepted": true}, + StartTime: stepStart, + EndTime: time.Now(), + Metadata: map[string]any{"coordinator": ie.Coord}, + }) + } + + // 2. LLM parses the body into a structured demand. + parseStart := time.Now() parsed, perr := parseInboxDemand(hc, *ollama, *judgeModel, ie.Body) + parseEnd := time.Now() if perr != nil { + if lf != nil && inboxTraceID != "" { + lf.Span(ctx, langfuse.SpanInput{ + TraceID: inboxTraceID, + Name: "llm.parse_demand", + Input: map[string]any{"body": ie.Body, "model": *judgeModel}, + Output: map[string]any{"error": perr.Error()}, + StartTime: parseStart, + EndTime: parseEnd, + Level: "ERROR", + }) + } log.Printf(" inbox demand parse failed (%s): %v", ie.Priority, perr) continue } + if lf != nil && inboxTraceID != "" { + lf.Span(ctx, langfuse.SpanInput{ + TraceID: inboxTraceID, + Name: "llm.parse_demand", + Input: map[string]any{"body": ie.Body, "model": *judgeModel}, + Output: parsed, + StartTime: parseStart, + EndTime: parseEnd, + }) + } + // 3. Build a query string from the parsed demand and search. query := parsed.AsQuery() coord := coordByName(coords, ie.Coord) + searchStart := time.Now() resp, err := matrixSearch(hc, *gateway, query, corpora, *k, true, coord.PlaybookCorpus) + searchEnd := time.Now() if err != nil { log.Printf(" inbox-triggered search failed (%s): %v", ie.Priority, err) continue } + if lf != nil && inboxTraceID != "" { + topIDs := make([]string, 0, len(resp.Results)) + for _, r := range resp.Results { + topIDs = append(topIDs, r.ID) + } + lf.Span(ctx, langfuse.SpanInput{ + TraceID: inboxTraceID, + Name: "matrix.search", + Input: map[string]any{ + "query": query, + "corpora": corpora, + "k": *k, + "playbook_corpus": coord.PlaybookCorpus, + }, + Output: map[string]any{ + "top_k_ids": topIDs, + "top1_distance": firstDistance(resp.Results), + "playbook_boosted": resp.PlaybookBoosted, + "playbook_injected": resp.PlaybookInjected, + }, + StartTime: searchStart, + EndTime: searchEnd, + }) + } ev := captureEvent("inbox-triggered-search", 9, ie.Coord, "inbox-burst", ie.Subject, query, 1, true, coord.PlaybookCorpus, resp) parsedJSON, _ := json.Marshal(parsed) ev.Note = fmt.Sprintf("inbox %s/%s from %s · LLM-parsed demand: %s", ie.Type, ie.Priority, ie.Sender, string(parsedJSON)) - // 4. Judge re-rates top-1 against the ORIGINAL body — not the - // parsed query. Catches the case where parsing dropped a - // constraint (or where the corpus has no real match for the - // asked specialist, e.g. "drone surveyor" against a corpus - // of warehouse workers — the closest semantic neighbor will - // have a tight distance but not actually fit). + + // 4. Judge re-rates top-1 against the ORIGINAL body. if len(resp.Results) > 0 { + judgeStart := time.Now() rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0]) ev.JudgeRating = rating + if lf != nil && inboxTraceID != "" { + lf.Span(ctx, langfuse.SpanInput{ + TraceID: inboxTraceID, + Name: "llm.judge_top1", + Input: map[string]any{ + "original_body": ie.Body, + "top1_id": resp.Results[0].ID, + "top1_corpus": resp.Results[0].Corpus, + }, + Output: map[string]any{"rating": rating}, + StartTime: judgeStart, + EndTime: time.Now(), + }) + } } output.Events = append(output.Events, ev) } @@ -948,6 +1053,60 @@ func ingestFreshWorker(hc *http.Client, gw, id, text string, metadata map[string return nil } +type langfuseEnvCreds struct { + URL string + PublicKey string + SecretKey string +} + +// loadLangfuseEnv parses a key=value env file (one assignment per +// non-comment line) and pulls LANGFUSE_URL, LANGFUSE_PUBLIC_KEY, +// LANGFUSE_SECRET_KEY. All three required; missing any returns an +// error so callers can skip-with-warning rather than silently +// run without tracing. +func loadLangfuseEnv(path string) (*langfuseEnvCreds, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + creds := &langfuseEnvCreds{} + sc := bufio.NewScanner(f) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + eq := strings.IndexByte(line, '=') + if eq < 0 { + continue + } + k, v := strings.TrimSpace(line[:eq]), strings.TrimSpace(line[eq+1:]) + switch k { + case "LANGFUSE_URL": + creds.URL = v + case "LANGFUSE_PUBLIC_KEY": + creds.PublicKey = v + case "LANGFUSE_SECRET_KEY": + creds.SecretKey = v + } + } + if err := sc.Err(); err != nil { + return nil, err + } + if creds.URL == "" || creds.PublicKey == "" || creds.SecretKey == "" { + return nil, fmt.Errorf("langfuse env missing one of URL/PUBLIC_KEY/SECRET_KEY") + } + return creds, nil +} + +func firstDistance(results []matrixResult) float32 { + if len(results) == 0 { + return 0 + } + return results[0].Distance +} + // parsedDemand is the LLM-extracted structure from an inbox message // body — what a real coordinator would type into a search form. // Empty fields are honest: the body didn't say.