lakehouse/auditor/fixtures/hybrid_38_40_45.ts
profit c5da680add Fixture: unique-per-run nonce eliminates state-pollution false positive
After the serde fix (PR #2, fix/upsert-outcome-serde) landed on main,
re-running this fixture STILL reported "doc_refs field is empty" —
but with a different root cause than the panic.

Root cause: pre-fix runs panicked on response serialization but had
already added entries to state (panic happened between upsert_entry
returning and the handler's serde_json::json! of the response). So
state.json was polluted with __auditor_test_worker__ entries from
those runs, WITHOUT doc_refs (doc_refs wasn't even wired at the time
those state rows were written).

The fixture's `find(endorsed_names.includes(TEST_WORKER_NAME))` was
picking the oldest polluted entry, not the fresh one.

Compounding: discovered a secondary bug while investigating —
upsert_entry's UPDATE branch only merges endorsed_names. doc_refs,
schema_fingerprint, valid_until on an UPDATE are silently dropped.
Filed as task #12, separate PR to follow.

Fix in this fixture: use a nonce suffix on both TEST_WORKER_NAME and
TEST_OPERATION so every run is guaranteed to hit the ADD path in
upsert_entry, sidestepping the UPDATE bug AND eliminating state
pollution entirely.

Live re-run after this edit:
  ✓ Phase 38    /v1/chat            449ms, 42 tokens
  ✓ Phase 40    Langfuse trace       20ms
  ✓ Phase 45.1  seed + doc_refs     239ms, doc_refs.length=1 persisted
  ✓ Phase 45.2  bridge diff           2ms, drifted=true
  ✗ Phase 45.3  drift-check           HONEST 404 (endpoint not built)

shipped_phases: [38, 40, 45.1, 45.2]  (was [38, 40, 45.2])
placeholder:    [45.3]                 (was [45.1, 45.3])

One fewer placeholder — exactly because the serde fix merged on
fix/upsert-outcome-serde and the fixture now cleanly exercises the
path. The loop is:
  fixture finds bug → PR fixes bug → fixture re-run confirms fix →
  one fewer placeholder.
2026-04-22 03:50:46 -05:00

336 lines
14 KiB
TypeScript

// The never-run hybrid test fixture. Exercises the full stack end-to-end
// across Phase 38 (/v1/chat), Phase 40 (Langfuse tracing), Phase 45
// slice 1 (DocRef on playbook), and Phase 45 slice 2 (context7 bridge
// drift detection).
//
// Returns HONEST per-layer results. A layer that's unimplemented or
// broken returns ok=false with a real error; overall verdict flags
// which phases are genuinely shipped vs still placeholder.
//
// Deterministic: always runs the same payload so a run's output is
// comparable across audits. Uses TEST-prefixed names so playbook
// state doesn't get polluted with production-looking fixture data.
//
// Preconditions:
// - gateway up on :3100
// - Python sidecar up on :3200
// - Ollama local + Ollama Cloud key loaded
// - Langfuse up on :3001
// - context7 bridge up on :3900 (manual-start per mcp-server/README)
// If bridge isn't running, that layer reports ok=false with a
// clear error — the fixture doesn't try to auto-start it.
import { readFile } from "node:fs/promises";
const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100";
const LANGFUSE = process.env.LANGFUSE_URL ?? "http://localhost:3001";
const BRIDGE = process.env.CONTEXT7_BRIDGE_URL ?? "http://localhost:3900";
const FIXTURE_ID = "auditor:hybrid_38_40_45:v1";
const TEST_WORKER_NAME = "__auditor_test_worker__";
const STALE_HASH = "stale-hash-for-drift-proof";
// Unique-per-run identifiers so each fixture invocation hits the ADD
// path in upsert_entry (not UPDATE/NOOP on a leftover from a prior
// run). Catches state pollution + avoids interaction with task #12
// (UPDATE branch silently drops doc_refs).
const RUN_NONCE = Date.now().toString(36);
const TEST_WORKER_NAME_VERSIONED = `__auditor_test_worker_${RUN_NONCE}__`;
const TEST_OPERATION_VERSIONED = `TEST: fill: __auditor_${RUN_NONCE}__ x1 in TestCity, TC`;
export interface LayerResult {
layer: string;
phase: string;
ok: boolean;
latency_ms: number;
evidence: string;
error?: string;
// Layer-specific numbers that downstream checks can aggregate.
metrics?: Record<string, number | string | boolean>;
}
export interface FixtureResult {
fixture: string;
ran_at: string;
overall: "pass" | "partial_pass" | "fail";
layers: LayerResult[];
real_numbers: Record<string, number>;
shipped_phases: string[];
placeholder_phases: string[];
notes: string[];
}
// Wraps a layer's async body with timing + structured error capture.
// On throw: returns ok=false with the error string as evidence, so
// downstream verdict code treats it as a layer failure rather than
// a fixture crash.
async function measureLayer(
layer: string,
phase: string,
body: () => Promise<{ evidence: string; metrics?: Record<string, number | string | boolean> }>,
): Promise<LayerResult> {
const t0 = Date.now();
try {
const { evidence, metrics } = await body();
return {
layer, phase, ok: true,
latency_ms: Date.now() - t0,
evidence, metrics,
};
} catch (e) {
return {
layer, phase, ok: false,
latency_ms: Date.now() - t0,
evidence: `FAILED: ${(e as Error).message.slice(0, 200)}`,
error: (e as Error).message,
};
}
}
async function langfuseAuth(): Promise<{ pk: string; sk: string } | null> {
// Default credentials match mcp-server/tracing.ts. Same sources the
// Rust side uses in gateway/src/v1/langfuse_trace.rs.
let pk = process.env.LANGFUSE_PUBLIC_KEY ?? "pk-lf-staffing";
let sk = process.env.LANGFUSE_SECRET_KEY ?? "sk-lf-staffing-secret";
if (!pk || !sk) return null;
return { pk, sk };
}
export async function runHybridFixture(): Promise<FixtureResult> {
const result: FixtureResult = {
fixture: FIXTURE_ID,
ran_at: new Date().toISOString(),
overall: "fail",
layers: [],
real_numbers: {},
shipped_phases: [],
placeholder_phases: [],
notes: [],
};
// ========================================================================
// Layer 1 — Phase 38: POST /v1/chat returns valid OpenAI shape
// ========================================================================
// Captured HERE, immediately before the chat layer runs, so layer 2's
// Langfuse-trace filter uses the actual moment the chat call was
// attempted — not the fixture start time. Earlier draft had a
// meaningless ternary returning result.ran_at on both branches; the
// LLM-Team codereview (2026-04-22) caught this and flagged it as a
// false-negative window on traces created between fixture-start and
// chat-fetch.
const chat_request_sent_ms = Date.now();
const l1 = await measureLayer("phase38_chat", "38", async () => {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
model: "qwen3.5:latest",
messages: [
{ role: "system", content: "Respond in 8 words or fewer." },
{ role: "user", content: "Name one Docker command." },
],
max_tokens: 60,
temperature: 0.2,
}),
signal: AbortSignal.timeout(60000),
});
if (!r.ok) throw new Error(`gateway ${r.status}: ${await r.text()}`);
const j: any = await r.json();
const content = j.choices?.[0]?.message?.content;
if (typeof content !== "string" || content.length === 0) {
throw new Error(`empty content — think-false default may not be wired: ${JSON.stringify(j).slice(0, 200)}`);
}
return {
evidence: `content="${content.slice(0, 60)}" tokens=${j.usage?.total_tokens}`,
metrics: {
prompt_tokens: j.usage?.prompt_tokens ?? 0,
completion_tokens: j.usage?.completion_tokens ?? 0,
total_tokens: j.usage?.total_tokens ?? 0,
content_nonempty: true,
},
};
});
result.layers.push(l1);
// ========================================================================
// Layer 2 — Phase 40: Langfuse trace lands within 3s
// ========================================================================
// Fire-and-forget tracing means we need a brief sleep before query.
await new Promise(res => setTimeout(res, 2500));
const l2 = await measureLayer("phase40_langfuse_trace", "40", async () => {
const auth = await langfuseAuth();
if (!auth) throw new Error("Langfuse credentials not in env");
const r = await fetch(`${LANGFUSE}/api/public/traces?limit=5&name=v1.chat:ollama`, {
headers: { Authorization: `Basic ${btoa(`${auth.pk}:${auth.sk}`)}` },
signal: AbortSignal.timeout(10000),
});
if (!r.ok) throw new Error(`langfuse ${r.status}: ${await r.text()}`);
const j: any = await r.json();
const items = Array.isArray(j.data) ? j.data : [];
// Filter on the chat-request timestamp captured above. A Langfuse
// trace must be newer than the moment we fired /v1/chat to plausibly
// belong to our request. Using fixture start time (result.ran_at)
// was wrong and could false-negative on slow fixtures.
const recent = items.filter((t: any) => Date.parse(t.timestamp) >= chat_request_sent_ms);
if (recent.length === 0) {
throw new Error(`no v1.chat:ollama trace since ${new Date(chat_request_sent_ms).toISOString()} (${items.length} older traces visible, Langfuse reachable — tracing is not firing)`);
}
const trace = recent[0];
return {
evidence: `trace ${trace.id.slice(0, 8)} name=${trace.name} age=${Date.now() - Date.parse(trace.timestamp)}ms`,
metrics: {
trace_age_ms: Date.now() - Date.parse(trace.timestamp),
trace_latency_reported: Number(trace.latency ?? 0),
recent_trace_count: recent.length,
},
};
});
result.layers.push(l2);
// ========================================================================
// Layer 3 — Phase 45 slice 1: /seed accepts doc_refs and persists them
// ========================================================================
// Seed a playbook with doc_refs referencing Docker at a stale hash.
// Ignore the cleanup concern for v1 — this fixture pollutes the
// in-memory playbook state; operator can retire test entries.
const l3 = await measureLayer("phase45_seed_with_doc_refs", "45.1", async () => {
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
operation: TEST_OPERATION_VERSIONED,
approach: "auditor hybrid fixture run",
context: "doc_refs integration test — retire after audit",
endorsed_names: [TEST_WORKER_NAME_VERSIONED],
append: true,
doc_refs: [
{
tool: "docker",
version_seen: "24.0.7",
snippet_hash: STALE_HASH,
source_url: "https://context7.com/docker/docs",
seen_at: new Date().toISOString(),
},
],
}),
signal: AbortSignal.timeout(30000),
});
if (!r.ok) throw new Error(`seed ${r.status}: ${await r.text()}`);
const j: any = await r.json();
// Verify doc_refs actually persisted — read state.json directly.
// That's the canonical source of truth; the seed response may or
// may not echo doc_refs back.
const state_raw = await readFile("/home/profit/lakehouse/data/_playbook_memory/state.json", "utf8");
const state = JSON.parse(state_raw);
const entries = state.entries ?? [];
const ours = entries.find((e: any) => (e.endorsed_names ?? []).includes(TEST_WORKER_NAME_VERSIONED));
if (!ours) throw new Error(`seeded entry not found in state.json (worker ${TEST_WORKER_NAME_VERSIONED} not present)`);
const docRefs = ours.doc_refs ?? [];
if (docRefs.length === 0) {
throw new Error("entry saved but doc_refs field is empty — the field accepted by the API isn't being persisted");
}
return {
evidence: `playbook ${ours.playbook_id.slice(0, 16)} persisted with doc_refs.length=${docRefs.length}; first tool=${docRefs[0]?.tool} hash=${docRefs[0]?.snippet_hash}`,
metrics: {
doc_refs_stored: docRefs.length,
entries_after: j.entries_after ?? -1,
playbook_id: ours.playbook_id,
mode: j.outcome?.mode ?? "unknown",
},
};
});
result.layers.push(l3);
// ========================================================================
// Layer 4 — Phase 45 slice 2: context7 bridge detects drift vs stale hash
// ========================================================================
const l4 = await measureLayer("phase45_bridge_diff", "45.2", async () => {
// Health check first — if bridge isn't running, fail with a clear
// message rather than mysterious connection refused.
try {
const h = await fetch(`${BRIDGE}/health`, { signal: AbortSignal.timeout(3000) });
if (!h.ok) throw new Error(`bridge /health ${h.status}`);
} catch (e) {
throw new Error(`context7 bridge at ${BRIDGE} is not reachable (bridge is manual-start; run 'bun run mcp-server/context7_bridge.ts'): ${(e as Error).message}`);
}
const r = await fetch(`${BRIDGE}/docs/docker/diff?since=${encodeURIComponent(STALE_HASH)}`, {
signal: AbortSignal.timeout(30000),
});
if (!r.ok) throw new Error(`bridge diff ${r.status}: ${await r.text()}`);
const j: any = await r.json();
if (j.drifted !== true) {
throw new Error(`expected drifted=true against ${STALE_HASH}; got ${JSON.stringify(j)}`);
}
return {
evidence: `bridge confirms drift: current=${j.current_snippet_hash} previous=${j.previous_snippet_hash} upstream_updated=${j.last_updated_upstream}`,
metrics: {
drifted: true,
current_hash_length: String(j.current_snippet_hash ?? "").length,
library_id: j.library_id,
},
};
});
result.layers.push(l4);
// ========================================================================
// Layer 5 — Phase 45 slice 3: /doc_drift/check/{id} endpoint
// (EXPECTED TO FAIL — endpoint unimplemented)
// ========================================================================
const l5 = await measureLayer("phase45_slice3_drift_flag", "45.3", async () => {
const pid = String(l3.metrics?.playbook_id ?? "");
if (!pid) throw new Error("layer 3 did not produce a playbook_id to check");
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/doc_drift/check/${encodeURIComponent(pid)}`, {
method: "POST",
signal: AbortSignal.timeout(10000),
});
if (r.status === 404 || r.status === 405) {
// This is the HONEST signal: the endpoint doesn't exist yet.
// Fail this layer with a clear marker — not a silent pass.
throw new Error(`endpoint unimplemented (${r.status}) — Phase 45 slice 3 is still placeholder. doc_refs are persisted (layer 3) and the bridge can answer drift questions (layer 4), but nothing ties them together into a playbook flag yet.`);
}
if (!r.ok) throw new Error(`drift/check ${r.status}: ${await r.text()}`);
const j: any = await r.json();
// If the endpoint DID exist, assert the flag got set.
if (!j.flagged) {
throw new Error(`endpoint responded but flag not set: ${JSON.stringify(j).slice(0, 200)}`);
}
return {
evidence: `playbook flagged: ${JSON.stringify(j).slice(0, 160)}`,
metrics: { flagged: true },
};
});
result.layers.push(l5);
// ========================================================================
// Verdict assembly
// ========================================================================
for (const l of result.layers) {
if (l.ok) result.shipped_phases.push(l.phase);
else result.placeholder_phases.push(l.phase);
result.real_numbers[`${l.layer}_latency_ms`] = l.latency_ms;
if (l.metrics) {
for (const [k, v] of Object.entries(l.metrics)) {
if (typeof v === "number") result.real_numbers[`${l.layer}.${k}`] = v;
}
}
}
const anyFail = result.placeholder_phases.length > 0;
const allPass = result.layers.every(l => l.ok);
result.overall = allPass ? "pass" : (result.shipped_phases.length > 0 ? "partial_pass" : "fail");
if (!l5.ok) {
result.notes.push(
"Phase 45 slice 3 (doc_drift/check endpoint) is the expected-fail layer. " +
"Layer 5 failing with a 404/405 is the CORRECT honest signal — don't mask this " +
"to get a green result. When slice 3 ships, layer 5 flips to ok=true automatically."
);
}
if (anyFail) {
result.notes.push(
`${result.placeholder_phases.length} layer(s) failed. Check evidence per layer for specifics.`
);
}
return result;
}