profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

175 lines
5.8 KiB
TypeScript

// langfuse_bridge — the missing piece called out in project_lost_stack.md
// and Phase 40 PRD. Polls Langfuse `/api/public/traces` at interval,
// forwards every completed trace to observer `:3800/event` with
// `source: "langfuse"`. Observer's existing ring buffer + analyzer
// pick it up, so the KB learns from cost/latency/provider deltas per
// model — not just from scenario outcomes.
//
// Loopback: observer persistOp() appends to data/_observer/ops.jsonl
// and its aggregator produces pathway_recommendations.jsonl. This
// bridge closes the feedback loop between LLM call metadata and the
// playbook/KB learning surface.
//
// State persistence: last-seen trace timestamp written to a JSON file
// so restarts don't double-emit. Bounded forward window (50/tick) so
// first-run catch-up doesn't hammer the observer.
const LANGFUSE_URL = process.env.LANGFUSE_URL ?? "http://localhost:3000";
const LANGFUSE_PUBLIC = process.env.LANGFUSE_PUBLIC_KEY;
const LANGFUSE_SECRET = process.env.LANGFUSE_SECRET_KEY;
const OBSERVER_URL = process.env.OBSERVER_URL ?? "http://localhost:3800";
const POLL_INTERVAL_MS = Number(process.env.LANGFUSE_POLL_MS ?? 30000);
const BATCH_LIMIT = Number(process.env.LANGFUSE_BATCH_LIMIT ?? 50);
const STATE_FILE = process.env.LANGFUSE_STATE_FILE
?? "/var/lib/lakehouse-guard/langfuse_last_seen.json";
interface LangfuseTrace {
id: string;
name?: string;
timestamp: string;
input?: any;
output?: any;
latency?: number; // seconds, per Langfuse API
totalCost?: number;
usage?: { input?: number; output?: number; total?: number };
metadata?: any;
}
interface State { last_seen_ts?: string }
function basicAuth(): string {
return "Basic " + btoa(`${LANGFUSE_PUBLIC}:${LANGFUSE_SECRET}`);
}
async function loadState(): Promise<State> {
try {
const f = Bun.file(STATE_FILE);
if (!(await f.exists())) return {};
return JSON.parse(await f.text()) as State;
} catch (e) {
console.warn(`[langfuse-bridge] state load failed: ${e}`);
return {};
}
}
async function saveState(s: State): Promise<void> {
try {
await Bun.write(STATE_FILE, JSON.stringify(s));
} catch (e) {
console.warn(`[langfuse-bridge] state save failed: ${e}`);
}
}
async function fetchTracesSince(cursor?: string): Promise<LangfuseTrace[]> {
const url = new URL("/api/public/traces", LANGFUSE_URL);
url.searchParams.set("limit", String(BATCH_LIMIT));
url.searchParams.set("orderBy", "timestamp.asc");
if (cursor) url.searchParams.set("fromTimestamp", cursor);
const resp = await fetch(url, {
headers: { authorization: basicAuth() },
signal: AbortSignal.timeout(10_000),
});
if (!resp.ok) {
throw new Error(`langfuse ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
}
const body: any = await resp.json();
return (body.data ?? []) as LangfuseTrace[];
}
// Shape one Langfuse trace into the ObservedOp the observer expects
// (see mcp-server/observer.ts:29). `source: "langfuse"` is the
// provenance flag so the analyzer can weight traces differently from
// scenario-sourced events.
function toObservedOp(t: LangfuseTrace): Record<string, any> {
const endpoint = t.metadata?.provider
?? t.metadata?.model
?? t.name
?? "langfuse.trace";
const inputSummary = typeof t.input === "string"
? t.input.slice(0, 200)
: JSON.stringify(t.input ?? {}).slice(0, 200);
const outputSummary = typeof t.output === "string"
? t.output.slice(0, 200)
: JSON.stringify(t.output ?? {}).slice(0, 200);
return {
timestamp: t.timestamp,
endpoint: `langfuse:${endpoint}`,
input_summary: inputSummary,
success: !t.metadata?.error,
duration_ms: Math.round((t.latency ?? 0) * 1000),
output_summary: outputSummary,
source: "langfuse",
sig_hash: t.metadata?.sig_hash,
event_kind: t.metadata?.task_class,
// Extra fields the observer doesn't schema but the KB aggregator
// can still pick up via JSON passthrough.
model: t.metadata?.model,
provider: t.metadata?.provider,
prompt_tokens: t.usage?.input,
completion_tokens: t.usage?.output,
total_tokens: t.usage?.total,
total_cost: t.totalCost,
};
}
async function forwardToObserver(op: Record<string, any>): Promise<void> {
const resp = await fetch(`${OBSERVER_URL}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(op),
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) {
throw new Error(`observer ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
}
}
async function tick(): Promise<void> {
const state = await loadState();
let traces: LangfuseTrace[];
try {
traces = await fetchTracesSince(state.last_seen_ts);
} catch (e) {
console.warn(`[langfuse-bridge] fetch failed: ${e}`);
return;
}
if (traces.length === 0) {
console.log(`[langfuse-bridge] no new traces since ${state.last_seen_ts ?? "start"}`);
return;
}
let last = state.last_seen_ts ?? "";
let forwarded = 0;
for (const t of traces) {
try {
await forwardToObserver(toObservedOp(t));
forwarded++;
if (t.timestamp > last) last = t.timestamp;
} catch (e) {
console.warn(`[langfuse-bridge] forward ${t.id} failed: ${e}`);
// Don't advance cursor on forward failure — retry next tick.
break;
}
}
if (last) await saveState({ last_seen_ts: last });
console.log(
`[langfuse-bridge] forwarded ${forwarded}/${traces.length}, last_seen=${last}`,
);
}
async function main(): Promise<void> {
if (!LANGFUSE_PUBLIC || !LANGFUSE_SECRET) {
console.error("LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY required");
process.exit(1);
}
console.log(
`[langfuse-bridge] polling ${LANGFUSE_URL} every ${POLL_INTERVAL_MS}ms → ${OBSERVER_URL}/event`,
);
await tick();
setInterval(tick, POLL_INTERVAL_MS);
}
main().catch(e => {
console.error(`[langfuse-bridge] fatal: ${e}`);
process.exit(1);
});