golangLAKEHOUSE/internal/materializer/materializer_test.go
root 89ca72d471 materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.

1. **vectord substrate fix verified at original scale** (closes the
   2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
   scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
   Throughput dropped 1,115 → 438/sec because previously-broken
   scenarios now do real HNSW Add work — honest cost of correctness.
   The fix (i.vectors side-store + safeGraphAdd recover wrappers +
   smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
   footprint that originally surfaced the bug.

2. **Materializer port** — internal/materializer + cmd/materializer +
   scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
   (12 transforms) + build_evidence_index.ts (idempotency, day-partition,
   receipt). On-wire JSON shape matches TS so Bun and Go runs are
   interchangeable. 14 tests green.

3. **Replay port** — internal/replay + cmd/replay +
   scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
   (retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
   phase 7 live invocation on the Go side. Both runtimes append to the
   same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.

Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.

STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:31:02 -05:00

219 lines
6.8 KiB
Go

package materializer
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
)
// TestMaterializeAll_RoundTrip writes a fixture source jsonl, runs the
// materializer, and checks every contract: receipt, output rows,
// idempotency on second run.
func TestMaterializeAll_RoundTrip(t *testing.T) {
root := t.TempDir()
mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
`{"run_id":"r1","source_label":"lab-a","created_at":"2026-04-26T00:00:00Z","extractor":"qwen3.5:latest","text":"first"}
{"run_id":"r2","source_label":"lab-b","created_at":"2026-04-26T01:00:00Z","extractor":"qwen3.5:latest","text":"second"}`)
transforms := []TransformDef{
{SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
}
first, err := MaterializeAll(MaterializeOptions{
Root: root,
Transforms: transforms,
RecordedAt: "2026-05-02T00:00:00Z",
})
if err != nil {
t.Fatalf("first run: %v", err)
}
if !first.Receipt.ValidationPass {
t.Errorf("first run should pass validation. errors=%v warnings=%v", first.Receipt.Errors, first.Receipt.Warnings)
}
if first.Totals.RowsRead != 2 || first.Totals.RowsWritten != 2 || first.Totals.RowsSkipped != 0 {
t.Errorf("first run counts wrong: %+v", first.Totals)
}
if first.Totals.RowsDeduped != 0 {
t.Errorf("first run should have 0 dedupes, got %d", first.Totals.RowsDeduped)
}
outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl")
rows := readJSONL(t, outPath)
if len(rows) != 2 {
t.Fatalf("expected 2 output rows, got %d", len(rows))
}
for _, r := range rows {
if r["schema_version"].(float64) != 1 {
t.Errorf("schema_version wrong: %v", r["schema_version"])
}
prov := r["provenance"].(map[string]any)
if prov["source_file"] != "data/_kb/distilled_facts.jsonl" {
t.Errorf("provenance.source_file: %v", prov["source_file"])
}
if prov["recorded_at"] != "2026-05-02T00:00:00Z" {
t.Errorf("provenance.recorded_at: %v", prov["recorded_at"])
}
}
// Second run with identical input + RecordedAt → all rows should
// dedup, nothing newly written.
second, err := MaterializeAll(MaterializeOptions{
Root: root,
Transforms: transforms,
RecordedAt: "2026-05-02T00:00:00Z",
})
if err != nil {
t.Fatalf("second run: %v", err)
}
if second.Totals.RowsRead != 2 || second.Totals.RowsWritten != 0 || second.Totals.RowsDeduped != 2 {
t.Errorf("idempotency broken; second run counts: %+v", second.Totals)
}
rows2 := readJSONL(t, outPath)
if len(rows2) != 2 {
t.Fatalf("output file grew on idempotent rerun: %d rows", len(rows2))
}
}
func TestMaterializeAll_BadJSONLineGoesToSkips(t *testing.T) {
root := t.TempDir()
mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
`{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"}
not-json
{"run_id":"r2","source_label":"b","created_at":"2026-04-26T01:00:00Z","extractor":"q","text":"t2"}`)
transforms := []TransformDef{
{SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
}
res, err := MaterializeAll(MaterializeOptions{
Root: root,
Transforms: transforms,
RecordedAt: "2026-05-02T00:00:00Z",
})
if err != nil {
t.Fatalf("run: %v", err)
}
if res.Totals.RowsWritten != 2 {
t.Errorf("good rows should still pass through; written=%d", res.Totals.RowsWritten)
}
if res.Totals.RowsSkipped != 1 {
t.Errorf("bad-json row should be in skipped bucket; got %d", res.Totals.RowsSkipped)
}
if res.Receipt.ValidationPass {
t.Errorf("validation_pass should be false when any row was skipped")
}
skipsPath := filepath.Join(root, "data/_kb/distillation_skips.jsonl")
skips := readJSONL(t, skipsPath)
if len(skips) != 1 {
t.Fatalf("expected 1 skip record, got %d", len(skips))
}
if !strings.Contains(toJSON(t, skips[0]), "JSON.parse failed") {
t.Errorf("skip record should mention parse failure: %v", skips[0])
}
}
func TestMaterializeAll_DryRunWritesNothing(t *testing.T) {
root := t.TempDir()
mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
`{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"}`)
transforms := []TransformDef{
{SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
}
res, err := MaterializeAll(MaterializeOptions{
Root: root,
Transforms: transforms,
RecordedAt: "2026-05-02T00:00:00Z",
DryRun: true,
})
if err != nil {
t.Fatalf("dry run: %v", err)
}
if res.Totals.RowsRead != 1 || res.Totals.RowsWritten != 1 {
t.Errorf("dry run should still count, got %+v", res.Totals)
}
outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl")
if _, err := os.Stat(outPath); !os.IsNotExist(err) {
t.Errorf("dry run wrote output file (should not): err=%v", err)
}
if _, err := os.Stat(res.ReceiptPath); !os.IsNotExist(err) {
t.Errorf("dry run wrote receipt (should not): err=%v", err)
}
}
func TestMaterializeAll_MissingSourceTalliedAsWarning(t *testing.T) {
root := t.TempDir()
transforms := []TransformDef{
{SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
}
res, err := MaterializeAll(MaterializeOptions{
Root: root,
Transforms: transforms,
RecordedAt: "2026-05-02T00:00:00Z",
})
if err != nil {
t.Fatalf("run: %v", err)
}
if res.Sources[0].RowsPresent {
t.Errorf("expected rows_present=false")
}
if !res.Receipt.ValidationPass {
t.Errorf("missing source ≠ validation failure; got pass=%v warnings=%v", res.Receipt.ValidationPass, res.Receipt.Warnings)
}
if len(res.Receipt.Warnings) == 0 {
t.Errorf("missing source should produce a warning")
}
}
// ─── Helpers ─────────────────────────────────────────────────────
func mustWriteFixture(t *testing.T, root, relpath, content string) {
t.Helper()
full := filepath.Join(root, relpath)
if err := os.MkdirAll(filepath.Dir(full), 0o755); err != nil {
t.Fatalf("mkdir: %v", err)
}
if err := os.WriteFile(full, []byte(content), 0o644); err != nil {
t.Fatalf("write fixture: %v", err)
}
}
func readJSONL(t *testing.T, path string) []map[string]any {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open %s: %v", path, err)
}
defer f.Close()
var out []map[string]any
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var row map[string]any
if err := json.Unmarshal(line, &row); err != nil {
t.Fatalf("parse %s: %v", path, err)
}
out = append(out, row)
}
if err := sc.Err(); err != nil {
t.Fatalf("scan %s: %v", path, err)
}
return out
}
func toJSON(t *testing.T, v any) string {
t.Helper()
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal: %v", err)
}
return string(b)
}