package materializer import ( "bufio" "encoding/json" "os" "path/filepath" "strings" "testing" ) // TestMaterializeAll_RoundTrip writes a fixture source jsonl, runs the // materializer, and checks every contract: receipt, output rows, // idempotency on second run. func TestMaterializeAll_RoundTrip(t *testing.T) { root := t.TempDir() mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl", `{"run_id":"r1","source_label":"lab-a","created_at":"2026-04-26T00:00:00Z","extractor":"qwen3.5:latest","text":"first"} {"run_id":"r2","source_label":"lab-b","created_at":"2026-04-26T01:00:00Z","extractor":"qwen3.5:latest","text":"second"}`) transforms := []TransformDef{ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform}, } first, err := MaterializeAll(MaterializeOptions{ Root: root, Transforms: transforms, RecordedAt: "2026-05-02T00:00:00Z", }) if err != nil { t.Fatalf("first run: %v", err) } if !first.Receipt.ValidationPass { t.Errorf("first run should pass validation. errors=%v warnings=%v", first.Receipt.Errors, first.Receipt.Warnings) } if first.Totals.RowsRead != 2 || first.Totals.RowsWritten != 2 || first.Totals.RowsSkipped != 0 { t.Errorf("first run counts wrong: %+v", first.Totals) } if first.Totals.RowsDeduped != 0 { t.Errorf("first run should have 0 dedupes, got %d", first.Totals.RowsDeduped) } outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl") rows := readJSONL(t, outPath) if len(rows) != 2 { t.Fatalf("expected 2 output rows, got %d", len(rows)) } for _, r := range rows { if r["schema_version"].(float64) != 1 { t.Errorf("schema_version wrong: %v", r["schema_version"]) } prov := r["provenance"].(map[string]any) if prov["source_file"] != "data/_kb/distilled_facts.jsonl" { t.Errorf("provenance.source_file: %v", prov["source_file"]) } if prov["recorded_at"] != "2026-05-02T00:00:00Z" { t.Errorf("provenance.recorded_at: %v", prov["recorded_at"]) } } // Second run with identical input + RecordedAt → all rows should // dedup, nothing newly written. second, err := MaterializeAll(MaterializeOptions{ Root: root, Transforms: transforms, RecordedAt: "2026-05-02T00:00:00Z", }) if err != nil { t.Fatalf("second run: %v", err) } if second.Totals.RowsRead != 2 || second.Totals.RowsWritten != 0 || second.Totals.RowsDeduped != 2 { t.Errorf("idempotency broken; second run counts: %+v", second.Totals) } rows2 := readJSONL(t, outPath) if len(rows2) != 2 { t.Fatalf("output file grew on idempotent rerun: %d rows", len(rows2)) } } func TestMaterializeAll_BadJSONLineGoesToSkips(t *testing.T) { root := t.TempDir() mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl", `{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"} not-json {"run_id":"r2","source_label":"b","created_at":"2026-04-26T01:00:00Z","extractor":"q","text":"t2"}`) transforms := []TransformDef{ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform}, } res, err := MaterializeAll(MaterializeOptions{ Root: root, Transforms: transforms, RecordedAt: "2026-05-02T00:00:00Z", }) if err != nil { t.Fatalf("run: %v", err) } if res.Totals.RowsWritten != 2 { t.Errorf("good rows should still pass through; written=%d", res.Totals.RowsWritten) } if res.Totals.RowsSkipped != 1 { t.Errorf("bad-json row should be in skipped bucket; got %d", res.Totals.RowsSkipped) } if res.Receipt.ValidationPass { t.Errorf("validation_pass should be false when any row was skipped") } skipsPath := filepath.Join(root, "data/_kb/distillation_skips.jsonl") skips := readJSONL(t, skipsPath) if len(skips) != 1 { t.Fatalf("expected 1 skip record, got %d", len(skips)) } if !strings.Contains(toJSON(t, skips[0]), "JSON.parse failed") { t.Errorf("skip record should mention parse failure: %v", skips[0]) } } func TestMaterializeAll_DryRunWritesNothing(t *testing.T) { root := t.TempDir() mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl", `{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"}`) transforms := []TransformDef{ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform}, } res, err := MaterializeAll(MaterializeOptions{ Root: root, Transforms: transforms, RecordedAt: "2026-05-02T00:00:00Z", DryRun: true, }) if err != nil { t.Fatalf("dry run: %v", err) } if res.Totals.RowsRead != 1 || res.Totals.RowsWritten != 1 { t.Errorf("dry run should still count, got %+v", res.Totals) } outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl") if _, err := os.Stat(outPath); !os.IsNotExist(err) { t.Errorf("dry run wrote output file (should not): err=%v", err) } if _, err := os.Stat(res.ReceiptPath); !os.IsNotExist(err) { t.Errorf("dry run wrote receipt (should not): err=%v", err) } } func TestMaterializeAll_MissingSourceTalliedAsWarning(t *testing.T) { root := t.TempDir() transforms := []TransformDef{ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform}, } res, err := MaterializeAll(MaterializeOptions{ Root: root, Transforms: transforms, RecordedAt: "2026-05-02T00:00:00Z", }) if err != nil { t.Fatalf("run: %v", err) } if res.Sources[0].RowsPresent { t.Errorf("expected rows_present=false") } if !res.Receipt.ValidationPass { t.Errorf("missing source ≠ validation failure; got pass=%v warnings=%v", res.Receipt.ValidationPass, res.Receipt.Warnings) } if len(res.Receipt.Warnings) == 0 { t.Errorf("missing source should produce a warning") } } // ─── Helpers ───────────────────────────────────────────────────── func mustWriteFixture(t *testing.T, root, relpath, content string) { t.Helper() full := filepath.Join(root, relpath) if err := os.MkdirAll(filepath.Dir(full), 0o755); err != nil { t.Fatalf("mkdir: %v", err) } if err := os.WriteFile(full, []byte(content), 0o644); err != nil { t.Fatalf("write fixture: %v", err) } } func readJSONL(t *testing.T, path string) []map[string]any { t.Helper() f, err := os.Open(path) if err != nil { t.Fatalf("open %s: %v", path, err) } defer f.Close() var out []map[string]any sc := bufio.NewScanner(f) sc.Buffer(make([]byte, 0, 1<<16), 1<<24) for sc.Scan() { line := sc.Bytes() if len(line) == 0 { continue } var row map[string]any if err := json.Unmarshal(line, &row); err != nil { t.Fatalf("parse %s: %v", path, err) } out = append(out, row) } if err := sc.Err(); err != nil { t.Fatalf("scan %s: %v", path, err) } return out } func toJSON(t *testing.T, v any) string { t.Helper() b, err := json.Marshal(v) if err != nil { t.Fatalf("marshal: %v", err) } return string(b) }