root e8cf113af8 gauntlet 2026-05-02: smoke chain + per-component scrum + parity probe
Production-readiness gauntlet exploiting the dual Rust/Go
implementation as a measurement instrument.

## Phase 1 — Full smoke chain
21/21 PASS in ~60s. Substrate intact across the full service surface.

## Phase 2 — Per-component scrum (token-volume fix)
Prior wave (165KB diff): Kimi 62 tokens out, Qwen 297 → no useful
analysis. This wave splits today's commits into 4 focused bundles
(36-71KB each):
  c1 validatord (46KB) → 0 convergent / 11 distinct
  c2 vectord substrate (36KB) → 0 convergent / 10 distinct
  c3 materializer (71KB) → 0 convergent / 6 distinct (Opus emitted
                           a BLOCK then self-retracted in same response)
  c4 replay (45KB) → 0 convergent / 10 distinct

Reviewer engagement vs prior wave: Kimi went 62 → ~250 tokens out
once bundles dropped below 60KB.

scripts/scrum_review.sh hardening:
  * Diff-size guard (warn >60KB, hard-fail >100KB,
    SCRUM_FORCE_OVERSIZE=1 override)
  * Tightened prompt — file path must appear EXACTLY as in diff
    so post-processor can grep WHERE: lines reliably
  * Auto-tally step dedupes by (reviewer, location); convergence
    counts distinct lineages (closes the prior `opus+opus+opus`
    false-convergence bug)

## Phase 3 — Cross-runtime validator parity probe (the headline finding)
scripts/cutover/parity/validator_parity.sh sends 6 identical
/v1/validate cases to Rust :3100 AND Go :4110, compares status+body.

Result: **6/6 status codes match · 5/6 body shapes diverge.**

Rust returns serde-tagged enum:   {"Schema":{"field":"x","reason":"y"}}
Go returns flat exported-fields:  {"Kind":"schema","Field":"x","Reason":"y"}

Both round-trip inside their own runtime; a caller swapping one for
the other would break parsing silently. Captured as new _open_ row
in docs/ARCHITECTURE_COMPARISON.md decisions tracker.

This is the "use the dual-implementation as a measurement instrument"
return — single-repo scrums can't catch this class of cross-runtime
drift.

## Phase 4 — Production assessment
ship-with-known-gap. Validator wire-format gap is documented, not
regressed. ~50 LOC future fix on Go side (custom MarshalJSON on
ValidationError to match Rust's serde shape).

Persistent stack config (/tmp/lakehouse-persistent.toml) gains
validatord on :3221 + persistent-validatord binary so operators
bringing up the persistent stack get the new daemon automatically.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 04:05:18 -05:00

2186 lines
70 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

commit 89ca72d4718fcb20ba9dcc03110e090890a0736e
Author: root <root@island37.com>
Date: Sat May 2 03:31:02 2026 -0500
materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.
1. **vectord substrate fix verified at original scale** (closes the
2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
Throughput dropped 1,115 → 438/sec because previously-broken
scenarios now do real HNSW Add work — honest cost of correctness.
The fix (i.vectors side-store + safeGraphAdd recover wrappers +
smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
footprint that originally surfaced the bug.
2. **Materializer port** — internal/materializer + cmd/materializer +
scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
(12 transforms) + build_evidence_index.ts (idempotency, day-partition,
receipt). On-wire JSON shape matches TS so Bun and Go runs are
interchangeable. 14 tests green.
3. **Replay port** — internal/replay + cmd/replay +
scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
(retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
phase 7 live invocation on the Go side. Both runtimes append to the
same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.
Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.
STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
diff --git a/cmd/materializer/main.go b/cmd/materializer/main.go
new file mode 100644
index 0000000..85d65bc
--- /dev/null
+++ b/cmd/materializer/main.go
@@ -0,0 +1,78 @@
+// materializer — Go-side build_evidence_index runner. Reads source
+// JSONL streams in `data/_kb/`, transforms each row to an
+// EvidenceRecord, writes day-partitioned output under `data/evidence/`
+// + an audit-grade receipt under `reports/distillation/<ts>/`.
+//
+// Mirrors the Bun runner at scripts/distillation/build_evidence_index.ts
+// — both runtimes can run against the same root and produce
+// interoperable outputs (per ADR-001 #4: same logic, on-wire
+// JSON shape preserved).
+//
+// Usage:
+//
+// materializer # full run, write outputs
+// materializer -dry-run # count, no writes
+// materializer -root /home/profit/lakehouse # custom repo root
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "time"
+
+ "git.agentview.dev/profit/golangLAKEHOUSE/internal/materializer"
+)
+
+func main() {
+ root := flag.String("root", defaultRoot(), "lakehouse repo root (defaults to $LH_DISTILL_ROOT or current dir)")
+ dryRun := flag.Bool("dry-run", false, "count rows but do not write outputs")
+ flag.Parse()
+
+ recordedAt := time.Now().UTC().Format(time.RFC3339Nano)
+
+ res, err := materializer.MaterializeAll(materializer.MaterializeOptions{
+ Root: *root,
+ Transforms: materializer.Transforms,
+ RecordedAt: recordedAt,
+ DryRun: *dryRun,
+ })
+ if err != nil {
+ log.Fatalf("materializer: %v", err)
+ }
+
+ suffix := ""
+ if *dryRun {
+ suffix = " (DRY RUN)"
+ }
+ fmt.Printf("[evidence_index] %d read · %d written · %d skipped · %d deduped%s\n",
+ res.Totals.RowsRead, res.Totals.RowsWritten, res.Totals.RowsSkipped, res.Totals.RowsDeduped, suffix)
+ for _, s := range res.Sources {
+ if !s.RowsPresent {
+ fmt.Printf(" %s: (missing — skipped)\n", s.SourceFileRelPath)
+ continue
+ }
+ fmt.Printf(" %s: read=%d wrote=%d skip=%d dedup=%d\n",
+ s.SourceFileRelPath, s.RowsRead, s.RowsWritten, s.RowsSkipped, s.RowsDeduped)
+ }
+
+ if !*dryRun {
+ fmt.Printf("[evidence_index] receipt: %s\n", res.ReceiptPath)
+ fmt.Printf("[evidence_index] validation_pass=%v\n", res.Receipt.ValidationPass)
+ }
+
+ if !res.Receipt.ValidationPass {
+ os.Exit(1)
+ }
+}
+
+func defaultRoot() string {
+ if r := os.Getenv("LH_DISTILL_ROOT"); r != "" {
+ return r
+ }
+ if cwd, err := os.Getwd(); err == nil {
+ return cwd
+ }
+ return "."
+}
diff --git a/internal/materializer/canonical.go b/internal/materializer/canonical.go
new file mode 100644
index 0000000..9d56281
--- /dev/null
+++ b/internal/materializer/canonical.go
@@ -0,0 +1,93 @@
+// Package materializer ports scripts/distillation/transforms.ts +
+// build_evidence_index.ts to Go. Source rows in data/_kb/*.jsonl are
+// transformed into EvidenceRecord rows under data/evidence/YYYY/MM/DD/.
+//
+// Per ADR-001 #4: port LOGIC, not bit-identical reproducibility — but
+// on-wire JSON layout matches the TS shape so Bun and Go runs stay
+// interchangeable for tooling that reads either output.
+package materializer
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "sort"
+)
+
+// CanonicalSha256 returns the hex SHA-256 of `obj` after sorting all
+// object keys recursively. Matches the TS canonicalSha256 in
+// auditor/schemas/distillation/types.ts so a row hashed by either
+// runtime gets the same sig_hash.
+//
+// Determinism contract: identical input → identical hash, regardless
+// of the producer's serialization order.
+func CanonicalSha256(obj any) (string, error) {
+ ordered := orderKeys(obj)
+ buf, err := json.Marshal(ordered)
+ if err != nil {
+ return "", fmt.Errorf("canonical marshal: %w", err)
+ }
+ sum := sha256.Sum256(buf)
+ return hex.EncodeToString(sum[:]), nil
+}
+
+// orderKeys recursively sorts every map's keys. For arrays we keep the
+// element order (arrays are inherently ordered). Scalars pass through.
+func orderKeys(v any) any {
+ switch t := v.(type) {
+ case map[string]any:
+ keys := make([]string, 0, len(t))
+ for k := range t {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ out := make(orderedMap, 0, len(keys))
+ for _, k := range keys {
+ out = append(out, kvPair{Key: k, Value: orderKeys(t[k])})
+ }
+ return out
+ case []any:
+ out := make([]any, len(t))
+ for i, e := range t {
+ out[i] = orderKeys(e)
+ }
+ return out
+ default:
+ return v
+ }
+}
+
+// orderedMap preserves insertion order on JSON marshal. We populate it
+// in sorted-key order so the produced bytes are stable.
+type orderedMap []kvPair
+
+type kvPair struct {
+ Key string
+ Value any
+}
+
+func (om orderedMap) MarshalJSON() ([]byte, error) {
+ if len(om) == 0 {
+ return []byte("{}"), nil
+ }
+ out := []byte{'{'}
+ for i, kv := range om {
+ if i > 0 {
+ out = append(out, ',')
+ }
+ k, err := json.Marshal(kv.Key)
+ if err != nil {
+ return nil, err
+ }
+ out = append(out, k...)
+ out = append(out, ':')
+ v, err := json.Marshal(kv.Value)
+ if err != nil {
+ return nil, err
+ }
+ out = append(out, v...)
+ }
+ out = append(out, '}')
+ return out, nil
+}
diff --git a/internal/materializer/canonical_test.go b/internal/materializer/canonical_test.go
new file mode 100644
index 0000000..8e2b2b4
--- /dev/null
+++ b/internal/materializer/canonical_test.go
@@ -0,0 +1,45 @@
+package materializer
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestCanonicalSha256_StableAcrossMapOrder(t *testing.T) {
+ a := map[string]any{"b": 2, "a": 1, "c": map[string]any{"y": "Y", "x": "X"}}
+ b := map[string]any{"a": 1, "c": map[string]any{"x": "X", "y": "Y"}, "b": 2}
+ hashA, err := CanonicalSha256(a)
+ if err != nil {
+ t.Fatalf("hash a: %v", err)
+ }
+ hashB, err := CanonicalSha256(b)
+ if err != nil {
+ t.Fatalf("hash b: %v", err)
+ }
+ if hashA != hashB {
+ t.Fatalf("identical objects produced different hashes:\n a=%s\n b=%s", hashA, hashB)
+ }
+ if len(hashA) != 64 || strings.Trim(hashA, "0123456789abcdef") != "" {
+ t.Fatalf("hash isn't a 64-char hex string: %q", hashA)
+ }
+}
+
+func TestCanonicalSha256_DistinctsDifferentInputs(t *testing.T) {
+ a := map[string]any{"k": "v"}
+ b := map[string]any{"k": "v2"}
+ hashA, _ := CanonicalSha256(a)
+ hashB, _ := CanonicalSha256(b)
+ if hashA == hashB {
+ t.Fatalf("different inputs collided: %s", hashA)
+ }
+}
+
+func TestCanonicalSha256_ArrayOrderMatters(t *testing.T) {
+ a := map[string]any{"k": []any{1, 2, 3}}
+ b := map[string]any{"k": []any{3, 2, 1}}
+ hashA, _ := CanonicalSha256(a)
+ hashB, _ := CanonicalSha256(b)
+ if hashA == hashB {
+ t.Fatal("array order should change the hash, but did not")
+ }
+}
diff --git a/internal/materializer/materializer.go b/internal/materializer/materializer.go
new file mode 100644
index 0000000..20f2214
--- /dev/null
+++ b/internal/materializer/materializer.go
@@ -0,0 +1,513 @@
+package materializer
+
+import (
+ "bufio"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+// MaterializeOptions drives MaterializeAll. Tests construct this with
+// a temp Root and override Transforms; the CLI uses defaults.
+type MaterializeOptions struct {
+ Root string // repo root; sources + outputs are relative
+ Transforms []TransformDef // override for tests
+ RecordedAt string // ISO 8601 — fixed for the run
+ DryRun bool // count but don't write
+}
+
+// SourceResult mirrors TS SourceResult.
+type SourceResult struct {
+ SourceFileRelPath string `json:"source_file_relpath"`
+ RowsPresent bool `json:"rows_present"`
+ RowsRead int `json:"rows_read"`
+ RowsWritten int `json:"rows_written"`
+ RowsSkipped int `json:"rows_skipped"`
+ RowsDeduped int `json:"rows_deduped"`
+ OutputFiles []string `json:"output_files"`
+}
+
+// MaterializeResult is what MaterializeAll returns. Receipt is the
+// authoritative "did the run succeed" surface — the rest is plumbing.
+type MaterializeResult struct {
+ Sources []SourceResult `json:"sources"`
+ Totals Totals `json:"totals"`
+ Receipt Receipt `json:"receipt"`
+ ReceiptPath string `json:"receipt_path"`
+ EvidenceDir string `json:"evidence_dir"`
+ SkipsPath string `json:"skips_path"`
+}
+
+// Totals — flat sum across sources.
+type Totals struct {
+ RowsRead int `json:"rows_read"`
+ RowsWritten int `json:"rows_written"`
+ RowsSkipped int `json:"rows_skipped"`
+ RowsDeduped int `json:"rows_deduped"`
+}
+
+// Receipt mirrors auditor/schemas/distillation/receipt.ts. Schema
+// version pinned to match the TS producer so consumers see the same
+// shape regardless of which runtime generated the run.
+const ReceiptSchemaVersion = 1
+
+type Receipt struct {
+ SchemaVersion int `json:"schema_version"`
+ Command string `json:"command"`
+ GitSHA string `json:"git_sha"`
+ GitBranch string `json:"git_branch,omitempty"`
+ GitDirty bool `json:"git_dirty"`
+ StartedAt string `json:"started_at"`
+ EndedAt string `json:"ended_at"`
+ DurationMs int64 `json:"duration_ms"`
+ InputFiles []FileReference `json:"input_files"`
+ OutputFiles []FileReference `json:"output_files"`
+ RecordCounts RecordCounts `json:"record_counts"`
+ ValidationPass bool `json:"validation_pass"`
+ Errors []string `json:"errors"`
+ Warnings []string `json:"warnings"`
+}
+
+type FileReference struct {
+ Path string `json:"path"`
+ SHA256 string `json:"sha256"`
+ Bytes int64 `json:"bytes"`
+}
+
+type RecordCounts struct {
+ In int `json:"in"`
+ Out int `json:"out"`
+ Skipped int `json:"skipped"`
+ Deduped int `json:"deduped"`
+}
+
+// SkipRecord is one row in distillation_skips.jsonl. Operators read
+// this stream when a run reports rows_skipped > 0.
+type SkipRecord struct {
+ SourceFile string `json:"source_file"`
+ LineOffset int64 `json:"line_offset"`
+ Errors []string `json:"errors"`
+ SigHash string `json:"sig_hash,omitempty"`
+ RecordedAt string `json:"recorded_at"`
+}
+
+// MaterializeAll iterates Transforms[], reads each source JSONL,
+// transforms each row, validates, writes to date-partitioned output.
+// Returns a Receipt whose ValidationPass tells the caller whether all
+// rows survived validation.
+func MaterializeAll(opts MaterializeOptions) (MaterializeResult, error) {
+ if opts.RecordedAt == "" {
+ return MaterializeResult{}, errors.New("MaterializeOptions.RecordedAt required")
+ }
+ if opts.Root == "" {
+ return MaterializeResult{}, errors.New("MaterializeOptions.Root required")
+ }
+ if !validISOTimestamp(opts.RecordedAt) {
+ return MaterializeResult{}, fmt.Errorf("RecordedAt not ISO 8601: %s", opts.RecordedAt)
+ }
+ transforms := opts.Transforms
+ if transforms == nil {
+ transforms = Transforms
+ }
+
+ evidenceDir := filepath.Join(opts.Root, "data", "evidence")
+ skipsPath := filepath.Join(opts.Root, "data", "_kb", "distillation_skips.jsonl")
+ reportsDir := filepath.Join(opts.Root, "reports", "distillation")
+
+ startedMs := time.Now().UnixMilli()
+ sources := make([]SourceResult, 0, len(transforms))
+ for _, t := range transforms {
+ sr, err := processSource(t, opts, evidenceDir, skipsPath)
+ if err != nil {
+ return MaterializeResult{}, fmt.Errorf("processSource %s: %w", t.SourceFileRelPath, err)
+ }
+ sources = append(sources, sr)
+ }
+
+ totals := Totals{}
+ for _, s := range sources {
+ totals.RowsRead += s.RowsRead
+ totals.RowsWritten += s.RowsWritten
+ totals.RowsSkipped += s.RowsSkipped
+ totals.RowsDeduped += s.RowsDeduped
+ }
+
+ endedAt := time.Now().UTC().Format(time.RFC3339Nano)
+ durationMs := time.Now().UnixMilli() - startedMs
+
+ inputFiles := make([]FileReference, 0)
+ for _, s := range sources {
+ if !s.RowsPresent {
+ continue
+ }
+ path := filepath.Join(opts.Root, s.SourceFileRelPath)
+ ref, err := fileReferenceAt(path, s.SourceFileRelPath)
+ if err == nil {
+ inputFiles = append(inputFiles, ref)
+ }
+ }
+ outputFiles := make([]FileReference, 0)
+ for _, s := range sources {
+ for _, p := range s.OutputFiles {
+ rel := strings.TrimPrefix(p, opts.Root+string(os.PathSeparator))
+ ref, err := fileReferenceAt(p, rel)
+ if err == nil {
+ outputFiles = append(outputFiles, ref)
+ }
+ }
+ }
+
+ var (
+ errs []string
+ warnings []string
+ )
+ for _, s := range sources {
+ if !s.RowsPresent {
+ warnings = append(warnings, fmt.Sprintf("%s: source file not found (skipped)", s.SourceFileRelPath))
+ }
+ if s.RowsSkipped > 0 {
+ warnings = append(warnings, fmt.Sprintf("%s: %d rows skipped (validation/parse errors)", s.SourceFileRelPath, s.RowsSkipped))
+ }
+ }
+
+ receipt := Receipt{
+ SchemaVersion: ReceiptSchemaVersion,
+ Command: commandLineOf(opts),
+ GitSHA: getGitSHA(opts.Root),
+ GitBranch: getGitBranch(opts.Root),
+ GitDirty: getGitDirty(opts.Root),
+ StartedAt: opts.RecordedAt,
+ EndedAt: endedAt,
+ DurationMs: durationMs,
+ InputFiles: inputFiles,
+ OutputFiles: outputFiles,
+ RecordCounts: RecordCounts{
+ In: totals.RowsRead,
+ Out: totals.RowsWritten,
+ Skipped: totals.RowsSkipped,
+ Deduped: totals.RowsDeduped,
+ },
+ ValidationPass: totals.RowsSkipped == 0,
+ Errors: emptyToNil(errs),
+ Warnings: emptyToNil(warnings),
+ }
+
+ stamp := strings.NewReplacer(":", "-", ".", "-").Replace(endedAt)
+ receiptDir := filepath.Join(reportsDir, stamp)
+ receiptPath := filepath.Join(receiptDir, "receipt.json")
+ if !opts.DryRun {
+ if err := os.MkdirAll(receiptDir, 0o755); err != nil {
+ return MaterializeResult{}, fmt.Errorf("mkdir receipt dir: %w", err)
+ }
+ buf, err := json.MarshalIndent(receipt, "", " ")
+ if err != nil {
+ return MaterializeResult{}, fmt.Errorf("marshal receipt: %w", err)
+ }
+ buf = append(buf, '\n')
+ if err := os.WriteFile(receiptPath, buf, 0o644); err != nil {
+ return MaterializeResult{}, fmt.Errorf("write receipt: %w", err)
+ }
+ }
+
+ return MaterializeResult{
+ Sources: sources,
+ Totals: totals,
+ Receipt: receipt,
+ ReceiptPath: receiptPath,
+ EvidenceDir: evidenceDir,
+ SkipsPath: skipsPath,
+ }, nil
+}
+
+// processSource reads, transforms, validates, and writes a single
+// source JSONL.
+func processSource(t TransformDef, opts MaterializeOptions, evidenceDir, skipsPath string) (SourceResult, error) {
+ srcPath := filepath.Join(opts.Root, t.SourceFileRelPath)
+ res := SourceResult{SourceFileRelPath: t.SourceFileRelPath}
+
+ info, err := os.Stat(srcPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return res, nil
+ }
+ return res, fmt.Errorf("stat %s: %w", srcPath, err)
+ }
+ if info.IsDir() {
+ return res, fmt.Errorf("%s is a directory, not a file", srcPath)
+ }
+ res.RowsPresent = true
+
+ partition := isoDatePartition(opts.RecordedAt)
+ stem := stemFor(t.SourceFileRelPath)
+ outDir := filepath.Join(evidenceDir, partition)
+ outPath := filepath.Join(outDir, stem+".jsonl")
+ if !opts.DryRun {
+ if err := os.MkdirAll(outDir, 0o755); err != nil {
+ return res, fmt.Errorf("mkdir output dir: %w", err)
+ }
+ }
+
+ seen, err := loadSeenHashes(outPath)
+ if err != nil {
+ return res, fmt.Errorf("load seen hashes: %w", err)
+ }
+
+ f, err := os.Open(srcPath)
+ if err != nil {
+ return res, fmt.Errorf("open %s: %w", srcPath, err)
+ }
+ defer f.Close()
+
+ var (
+ rowsToWrite []byte
+ skipsToWrite []byte
+ )
+
+ scanner := bufio.NewScanner(f)
+ scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
+ lineOffset := int64(-1)
+ for scanner.Scan() {
+ lineOffset++
+ raw := scanner.Bytes()
+ if len(raw) == 0 {
+ continue
+ }
+ res.RowsRead++
+
+ var row map[string]any
+ if err := json.Unmarshal(raw, &row); err != nil {
+ res.RowsSkipped++
+ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
+ SourceFile: t.SourceFileRelPath,
+ LineOffset: lineOffset,
+ Errors: []string{"JSON.parse failed: " + trim(err.Error(), 200)},
+ RecordedAt: opts.RecordedAt,
+ })
+ continue
+ }
+
+ sigHash, err := CanonicalSha256(row)
+ if err != nil {
+ res.RowsSkipped++
+ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
+ SourceFile: t.SourceFileRelPath,
+ LineOffset: lineOffset,
+ Errors: []string{"sig_hash compute failed: " + trim(err.Error(), 200)},
+ RecordedAt: opts.RecordedAt,
+ })
+ continue
+ }
+ if _, dup := seen[sigHash]; dup {
+ res.RowsDeduped++
+ continue
+ }
+ seen[sigHash] = struct{}{}
+
+ rec := t.Transform(TransformInput{
+ Row: row,
+ LineOffset: lineOffset,
+ SourceFileRelPath: t.SourceFileRelPath,
+ RecordedAt: opts.RecordedAt,
+ SigHash: sigHash,
+ })
+ if rec == nil {
+ res.RowsSkipped++
+ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
+ SourceFile: t.SourceFileRelPath,
+ LineOffset: lineOffset,
+ Errors: []string{"transform returned nil"},
+ SigHash: sigHash,
+ RecordedAt: opts.RecordedAt,
+ })
+ continue
+ }
+
+ if vErrs := ValidateEvidenceRecord(*rec); len(vErrs) > 0 {
+ res.RowsSkipped++
+ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
+ SourceFile: t.SourceFileRelPath,
+ LineOffset: lineOffset,
+ Errors: vErrs,
+ SigHash: sigHash,
+ RecordedAt: opts.RecordedAt,
+ })
+ continue
+ }
+
+ buf, err := json.Marshal(rec)
+ if err != nil {
+ res.RowsSkipped++
+ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
+ SourceFile: t.SourceFileRelPath,
+ LineOffset: lineOffset,
+ Errors: []string{"marshal output: " + trim(err.Error(), 200)},
+ SigHash: sigHash,
+ RecordedAt: opts.RecordedAt,
+ })
+ continue
+ }
+ rowsToWrite = append(rowsToWrite, buf...)
+ rowsToWrite = append(rowsToWrite, '\n')
+ res.RowsWritten++
+ }
+ if err := scanner.Err(); err != nil {
+ return res, fmt.Errorf("scan %s: %w", srcPath, err)
+ }
+
+ if !opts.DryRun {
+ if len(rowsToWrite) > 0 {
+ if err := appendBytes(outPath, rowsToWrite); err != nil {
+ return res, fmt.Errorf("append output: %w", err)
+ }
+ res.OutputFiles = append(res.OutputFiles, outPath)
+ }
+ if len(skipsToWrite) > 0 {
+ if err := os.MkdirAll(filepath.Dir(skipsPath), 0o755); err != nil {
+ return res, fmt.Errorf("mkdir skips dir: %w", err)
+ }
+ if err := appendBytes(skipsPath, skipsToWrite); err != nil {
+ return res, fmt.Errorf("append skips: %w", err)
+ }
+ }
+ }
+
+ return res, nil
+}
+
+// loadSeenHashes reads sig_hashes from an existing day-partition output
+// file. Idempotency: a re-run that produces the same hash is a dedup
+// not a duplicate write.
+func loadSeenHashes(outPath string) (map[string]struct{}, error) {
+ seen := map[string]struct{}{}
+ f, err := os.Open(outPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return seen, nil
+ }
+ return nil, err
+ }
+ defer f.Close()
+ scanner := bufio.NewScanner(f)
+ scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
+ for scanner.Scan() {
+ raw := scanner.Bytes()
+ if len(raw) == 0 {
+ continue
+ }
+ var rec struct {
+ Provenance struct {
+ SigHash string `json:"sig_hash"`
+ } `json:"provenance"`
+ }
+ if err := json.Unmarshal(raw, &rec); err != nil {
+ continue // malformed line; ignore
+ }
+ if rec.Provenance.SigHash != "" {
+ seen[rec.Provenance.SigHash] = struct{}{}
+ }
+ }
+ return seen, scanner.Err()
+}
+
+func appendSkip(buf []byte, sk SkipRecord) []byte {
+ out, err := json.Marshal(sk)
+ if err != nil {
+ // Should never happen for the well-typed SkipRecord — fall back
+ // to a sentinel so the materializer doesn't drop the skip silently.
+ return append(buf, []byte(fmt.Sprintf(`{"source_file":%q,"line_offset":%d,"errors":["marshal_skip_failed:%s"],"recorded_at":%q}`+"\n",
+ sk.SourceFile, sk.LineOffset, err.Error(), sk.RecordedAt))...)
+ }
+ buf = append(buf, out...)
+ buf = append(buf, '\n')
+ return buf
+}
+
+func appendBytes(path string, data []byte) error {
+ f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ _, err = f.Write(data)
+ return err
+}
+
+func isoDatePartition(iso string) string {
+ t, err := time.Parse(time.RFC3339Nano, iso)
+ if err != nil {
+ t, err = time.Parse(time.RFC3339, iso)
+ }
+ if err != nil {
+ // Fallback: TS would have produced "NaN/NaN/NaN" — we use
+ // "0000/00/00" which is at least a valid path. Materializer
+ // fails its own RecordedAt validation before reaching here.
+ return "0000/00/00"
+ }
+ t = t.UTC()
+ return fmt.Sprintf("%04d/%02d/%02d", t.Year(), int(t.Month()), t.Day())
+}
+
+func fileReferenceAt(path, relpath string) (FileReference, error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return FileReference{}, err
+ }
+ defer f.Close()
+ hasher := sha256.New()
+ n, err := io.Copy(hasher, f)
+ if err != nil {
+ return FileReference{}, err
+ }
+ return FileReference{
+ Path: relpath,
+ SHA256: hex.EncodeToString(hasher.Sum(nil)),
+ Bytes: n,
+ }, nil
+}
+
+func getGitSHA(root string) string {
+ out, err := exec.Command("git", "-C", root, "rev-parse", "HEAD").Output()
+ if err != nil {
+ return strings.Repeat("0", 40)
+ }
+ return strings.TrimSpace(string(out))
+}
+
+func getGitBranch(root string) string {
+ out, err := exec.Command("git", "-C", root, "rev-parse", "--abbrev-ref", "HEAD").Output()
+ if err != nil {
+ return ""
+ }
+ return strings.TrimSpace(string(out))
+}
+
+func getGitDirty(root string) bool {
+ out, err := exec.Command("git", "-C", root, "status", "--porcelain").Output()
+ if err != nil {
+ return false
+ }
+ return strings.TrimSpace(string(out)) != ""
+}
+
+func commandLineOf(opts MaterializeOptions) string {
+ cmd := "go run ./cmd/materializer"
+ if opts.DryRun {
+ cmd += " --dry-run"
+ }
+ return cmd
+}
+
+func emptyToNil(s []string) []string {
+ if len(s) == 0 {
+ return []string{}
+ }
+ return s
+}
diff --git a/internal/materializer/materializer_test.go b/internal/materializer/materializer_test.go
new file mode 100644
index 0000000..a24bf07
--- /dev/null
+++ b/internal/materializer/materializer_test.go
@@ -0,0 +1,218 @@
+package materializer
+
+import (
+ "bufio"
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+)
+
+// TestMaterializeAll_RoundTrip writes a fixture source jsonl, runs the
+// materializer, and checks every contract: receipt, output rows,
+// idempotency on second run.
+func TestMaterializeAll_RoundTrip(t *testing.T) {
+ root := t.TempDir()
+ mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
+ `{"run_id":"r1","source_label":"lab-a","created_at":"2026-04-26T00:00:00Z","extractor":"qwen3.5:latest","text":"first"}
+{"run_id":"r2","source_label":"lab-b","created_at":"2026-04-26T01:00:00Z","extractor":"qwen3.5:latest","text":"second"}`)
+
+ transforms := []TransformDef{
+ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
+ }
+
+ first, err := MaterializeAll(MaterializeOptions{
+ Root: root,
+ Transforms: transforms,
+ RecordedAt: "2026-05-02T00:00:00Z",
+ })
+ if err != nil {
+ t.Fatalf("first run: %v", err)
+ }
+ if !first.Receipt.ValidationPass {
+ t.Errorf("first run should pass validation. errors=%v warnings=%v", first.Receipt.Errors, first.Receipt.Warnings)
+ }
+ if first.Totals.RowsRead != 2 || first.Totals.RowsWritten != 2 || first.Totals.RowsSkipped != 0 {
+ t.Errorf("first run counts wrong: %+v", first.Totals)
+ }
+ if first.Totals.RowsDeduped != 0 {
+ t.Errorf("first run should have 0 dedupes, got %d", first.Totals.RowsDeduped)
+ }
+
+ outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl")
+ rows := readJSONL(t, outPath)
+ if len(rows) != 2 {
+ t.Fatalf("expected 2 output rows, got %d", len(rows))
+ }
+ for _, r := range rows {
+ if r["schema_version"].(float64) != 1 {
+ t.Errorf("schema_version wrong: %v", r["schema_version"])
+ }
+ prov := r["provenance"].(map[string]any)
+ if prov["source_file"] != "data/_kb/distilled_facts.jsonl" {
+ t.Errorf("provenance.source_file: %v", prov["source_file"])
+ }
+ if prov["recorded_at"] != "2026-05-02T00:00:00Z" {
+ t.Errorf("provenance.recorded_at: %v", prov["recorded_at"])
+ }
+ }
+
+ // Second run with identical input + RecordedAt → all rows should
+ // dedup, nothing newly written.
+ second, err := MaterializeAll(MaterializeOptions{
+ Root: root,
+ Transforms: transforms,
+ RecordedAt: "2026-05-02T00:00:00Z",
+ })
+ if err != nil {
+ t.Fatalf("second run: %v", err)
+ }
+ if second.Totals.RowsRead != 2 || second.Totals.RowsWritten != 0 || second.Totals.RowsDeduped != 2 {
+ t.Errorf("idempotency broken; second run counts: %+v", second.Totals)
+ }
+ rows2 := readJSONL(t, outPath)
+ if len(rows2) != 2 {
+ t.Fatalf("output file grew on idempotent rerun: %d rows", len(rows2))
+ }
+}
+
+func TestMaterializeAll_BadJSONLineGoesToSkips(t *testing.T) {
+ root := t.TempDir()
+ mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
+ `{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"}
+not-json
+{"run_id":"r2","source_label":"b","created_at":"2026-04-26T01:00:00Z","extractor":"q","text":"t2"}`)
+
+ transforms := []TransformDef{
+ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
+ }
+ res, err := MaterializeAll(MaterializeOptions{
+ Root: root,
+ Transforms: transforms,
+ RecordedAt: "2026-05-02T00:00:00Z",
+ })
+ if err != nil {
+ t.Fatalf("run: %v", err)
+ }
+ if res.Totals.RowsWritten != 2 {
+ t.Errorf("good rows should still pass through; written=%d", res.Totals.RowsWritten)
+ }
+ if res.Totals.RowsSkipped != 1 {
+ t.Errorf("bad-json row should be in skipped bucket; got %d", res.Totals.RowsSkipped)
+ }
+ if res.Receipt.ValidationPass {
+ t.Errorf("validation_pass should be false when any row was skipped")
+ }
+
+ skipsPath := filepath.Join(root, "data/_kb/distillation_skips.jsonl")
+ skips := readJSONL(t, skipsPath)
+ if len(skips) != 1 {
+ t.Fatalf("expected 1 skip record, got %d", len(skips))
+ }
+ if !strings.Contains(toJSON(t, skips[0]), "JSON.parse failed") {
+ t.Errorf("skip record should mention parse failure: %v", skips[0])
+ }
+}
+
+func TestMaterializeAll_DryRunWritesNothing(t *testing.T) {
+ root := t.TempDir()
+ mustWriteFixture(t, root, "data/_kb/distilled_facts.jsonl",
+ `{"run_id":"r1","source_label":"a","created_at":"2026-04-26T00:00:00Z","extractor":"q","text":"t"}`)
+
+ transforms := []TransformDef{
+ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
+ }
+ res, err := MaterializeAll(MaterializeOptions{
+ Root: root,
+ Transforms: transforms,
+ RecordedAt: "2026-05-02T00:00:00Z",
+ DryRun: true,
+ })
+ if err != nil {
+ t.Fatalf("dry run: %v", err)
+ }
+ if res.Totals.RowsRead != 1 || res.Totals.RowsWritten != 1 {
+ t.Errorf("dry run should still count, got %+v", res.Totals)
+ }
+ outPath := filepath.Join(root, "data/evidence/2026/05/02/distilled_facts.jsonl")
+ if _, err := os.Stat(outPath); !os.IsNotExist(err) {
+ t.Errorf("dry run wrote output file (should not): err=%v", err)
+ }
+ if _, err := os.Stat(res.ReceiptPath); !os.IsNotExist(err) {
+ t.Errorf("dry run wrote receipt (should not): err=%v", err)
+ }
+}
+
+func TestMaterializeAll_MissingSourceTalliedAsWarning(t *testing.T) {
+ root := t.TempDir()
+ transforms := []TransformDef{
+ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
+ }
+ res, err := MaterializeAll(MaterializeOptions{
+ Root: root,
+ Transforms: transforms,
+ RecordedAt: "2026-05-02T00:00:00Z",
+ })
+ if err != nil {
+ t.Fatalf("run: %v", err)
+ }
+ if res.Sources[0].RowsPresent {
+ t.Errorf("expected rows_present=false")
+ }
+ if !res.Receipt.ValidationPass {
+ t.Errorf("missing source ≠ validation failure; got pass=%v warnings=%v", res.Receipt.ValidationPass, res.Receipt.Warnings)
+ }
+ if len(res.Receipt.Warnings) == 0 {
+ t.Errorf("missing source should produce a warning")
+ }
+}
+
+// ─── Helpers ─────────────────────────────────────────────────────
+
+func mustWriteFixture(t *testing.T, root, relpath, content string) {
+ t.Helper()
+ full := filepath.Join(root, relpath)
+ if err := os.MkdirAll(filepath.Dir(full), 0o755); err != nil {
+ t.Fatalf("mkdir: %v", err)
+ }
+ if err := os.WriteFile(full, []byte(content), 0o644); err != nil {
+ t.Fatalf("write fixture: %v", err)
+ }
+}
+
+func readJSONL(t *testing.T, path string) []map[string]any {
+ t.Helper()
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open %s: %v", path, err)
+ }
+ defer f.Close()
+ var out []map[string]any
+ sc := bufio.NewScanner(f)
+ sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
+ for sc.Scan() {
+ line := sc.Bytes()
+ if len(line) == 0 {
+ continue
+ }
+ var row map[string]any
+ if err := json.Unmarshal(line, &row); err != nil {
+ t.Fatalf("parse %s: %v", path, err)
+ }
+ out = append(out, row)
+ }
+ if err := sc.Err(); err != nil {
+ t.Fatalf("scan %s: %v", path, err)
+ }
+ return out
+}
+
+func toJSON(t *testing.T, v any) string {
+ t.Helper()
+ b, err := json.Marshal(v)
+ if err != nil {
+ t.Fatalf("marshal: %v", err)
+ }
+ return string(b)
+}
diff --git a/internal/materializer/transforms.go b/internal/materializer/transforms.go
new file mode 100644
index 0000000..7ae4b08
--- /dev/null
+++ b/internal/materializer/transforms.go
@@ -0,0 +1,653 @@
+package materializer
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+ "time"
+
+ "git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
+)
+
+// TransformInput is what each TransformFn receives. Mirrors the TS
+// TransformInput shape — every field is supplied by the materializer
+// driver, not by the transform.
+type TransformInput struct {
+ Row map[string]any
+ LineOffset int64
+ SourceFileRelPath string // relative to repo root
+ RecordedAt string // ISO 8601, caller's "now"
+ SigHash string // canonical sha256 of row, pre-computed
+}
+
+// TransformFn maps a single source row to an EvidenceRecord. Returning
+// nil signals "skip this row" — the materializer logs a deterministic
+// skip with no record produced.
+//
+// Transforms must be pure: no I/O, no clock reads, no model calls.
+// Any time component must come from the row itself or RecordedAt.
+type TransformFn func(in TransformInput) *distillation.EvidenceRecord
+
+// TransformDef binds a source-file path to its TransformFn. Order in
+// Transforms[] has no effect (each runs against its own SourceFile).
+type TransformDef struct {
+ SourceFileRelPath string
+ Transform TransformFn
+}
+
+// ─── Transforms — one per source-file. Ports of TRANSFORMS[] in
+// scripts/distillation/transforms.ts. Tier 1 first (validated), Tier 2
+// second (untested but in-shape). ────────────────────────────────────
+
+// Transforms is the canonical list. CLI passes this to MaterializeAll.
+// Adding a new source: append a TransformDef.
+var Transforms = []TransformDef{
+ // ── Tier 1: validated 100% in Phase 1 ─────────────────────────
+ {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
+ {SourceFileRelPath: "data/_kb/distilled_procedures.jsonl", Transform: extractorTransform},
+ {SourceFileRelPath: "data/_kb/distilled_config_hints.jsonl", Transform: extractorTransform},
+ {SourceFileRelPath: "data/_kb/contract_analyses.jsonl", Transform: contractAnalysesTransform},
+ {SourceFileRelPath: "data/_kb/mode_experiments.jsonl", Transform: modeExperimentsTransform},
+ {SourceFileRelPath: "data/_kb/scrum_reviews.jsonl", Transform: scrumReviewsTransform},
+ {SourceFileRelPath: "data/_kb/observer_escalations.jsonl", Transform: observerEscalationsTransform},
+ {SourceFileRelPath: "data/_kb/audit_facts.jsonl", Transform: auditFactsTransform},
+
+ // ── Tier 2: untested streams that still belong in EvidenceRecord ──
+ {SourceFileRelPath: "data/_kb/auto_apply.jsonl", Transform: autoApplyTransform},
+ {SourceFileRelPath: "data/_kb/observer_reviews.jsonl", Transform: observerReviewsTransform},
+ {SourceFileRelPath: "data/_kb/audits.jsonl", Transform: auditsTransform},
+ {SourceFileRelPath: "data/_kb/outcomes.jsonl", Transform: outcomesTransform},
+}
+
+// TransformByPath returns the TransformDef for a given source path,
+// or nil if no transform is registered. Matches the TS helper.
+func TransformByPath(relpath string) *TransformDef {
+ for i := range Transforms {
+ if Transforms[i].SourceFileRelPath == relpath {
+ return &Transforms[i]
+ }
+ }
+ return nil
+}
+
+// ─── Per-source transform implementations ─────────────────────────
+
+// extractorTransform powers the three distilled_* sources. Same shape:
+// LLM-extracted text with a model_name from `extractor`.
+func extractorTransform(in TransformInput) *distillation.EvidenceRecord {
+ stem := stemFor(in.SourceFileRelPath)
+ rec := distillation.EvidenceRecord{
+ RunID: strDefault(in.Row, "run_id", fmt.Sprintf("%s:%d", stem, in.LineOffset)),
+ TaskID: strDefault(in.Row, "source_label", fmt.Sprintf("%s:%d", stem, in.LineOffset)),
+ Timestamp: getString(in.Row, "created_at"),
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelName: getString(in.Row, "extractor"),
+ ModelRole: distillation.RoleExtractor,
+ ModelProvider: "ollama",
+ Text: getString(in.Row, "text"),
+ }
+ return &rec
+}
+
+// contractAnalysesTransform: per-permit executor with observer signals,
+// retrieval telemetry, and cost in micro-units that gets converted to
+// USD. Carries `contractor` in metadata.
+func contractAnalysesTransform(in TransformInput) *distillation.EvidenceRecord {
+ permitID := getString(in.Row, "permit_id")
+ tsStr := getString(in.Row, "ts")
+ tsMs := timeToMS(tsStr)
+
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("contract_analysis:%s:%d", permitID, tsMs),
+ TaskID: fmt.Sprintf("permit:%s", permitID),
+ Timestamp: tsStr,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleExecutor,
+ Text: getString(in.Row, "analysis"),
+ }
+
+ if rc := buildRetrievedContext(map[string]any{
+ "matrix_corpora": objectKeys(in.Row, "matrix_corpora"),
+ "matrix_hits": in.Row["matrix_hits"],
+ }); rc != nil {
+ rec.RetrievedContext = rc
+ }
+
+ if notes := flattenNotes(in.Row, "observer_notes"); len(notes) > 0 {
+ rec.ObserverNotes = notes
+ }
+ if v, ok := in.Row["observer_verdict"].(string); ok && v != "" {
+ rec.ObserverVerdict = distillation.ObserverVerdict(v)
+ }
+ if c, ok := numFloat(in.Row, "observer_conf"); ok {
+ rec.ObserverConfidence = c
+ }
+ if ok, present := boolField(in.Row, "ok"); present && ok {
+ rec.SuccessMarkers = []string{"matrix_hits_above_threshold"}
+ }
+ verdict := getString(in.Row, "observer_verdict")
+ okPresent, _ := boolField(in.Row, "ok")
+ if !okPresent || verdict == "reject" {
+ rec.FailureMarkers = []string{"observer_rejected"}
+ }
+ if cost, ok := numFloat(in.Row, "cost"); ok {
+ rec.CostUSD = cost / 1_000_000.0
+ }
+ if d, ok := numInt(in.Row, "duration_ms"); ok {
+ rec.LatencyMs = d
+ }
+ if contractor := getString(in.Row, "contractor"); contractor != "" {
+ rec.Metadata = map[string]any{"contractor": contractor}
+ }
+ return &rec
+}
+
+// modeExperimentsTransform: mode_runner per-call traces. Provider
+// derived from model name shape ("/" → openrouter, else ollama_cloud).
+func modeExperimentsTransform(in TransformInput) *distillation.EvidenceRecord {
+ tsStr := getString(in.Row, "ts")
+ tsMs := timeToMS(tsStr)
+ filePath := getString(in.Row, "file_path")
+ keySuffix := filePath
+ if keySuffix == "" {
+ keySuffix = fmt.Sprintf("%d", in.LineOffset)
+ }
+ model := getString(in.Row, "model")
+ provider := "ollama_cloud"
+ if strings.Contains(model, "/") {
+ provider = "openrouter"
+ }
+
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("mode_exec:%d:%s", tsMs, keySuffix),
+ TaskID: getString(in.Row, "task_class"),
+ Timestamp: tsStr,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelName: model,
+ ModelRole: distillation.RoleExecutor,
+ ModelProvider: provider,
+ Text: getString(in.Row, "response"),
+ }
+ if d, ok := numInt(in.Row, "latency_ms"); ok {
+ rec.LatencyMs = d
+ }
+ if filePath != "" {
+ rec.SourceFiles = []string{filePath}
+ }
+ if sources, ok := in.Row["sources"].(map[string]any); ok {
+ rec.RetrievedContext = buildRetrievedContext(map[string]any{
+ "matrix_corpora": sources["matrix_corpus"],
+ "matrix_chunks_kept": sources["matrix_chunks_kept"],
+ "matrix_chunks_dropped": sources["matrix_chunks_dropped"],
+ "pathway_fingerprints_seen": sources["bug_fingerprints_count"],
+ })
+ }
+ return &rec
+}
+
+// scrumReviewsTransform: per-file scrum review traces. Success marker
+// captures the attempt number when accepted.
+func scrumReviewsTransform(in TransformInput) *distillation.EvidenceRecord {
+ reviewedAt := getString(in.Row, "reviewed_at")
+ tsMs := timeToMS(reviewedAt)
+ file := getString(in.Row, "file")
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("scrum:%d:%s", tsMs, file),
+ TaskID: fmt.Sprintf("scrum_review:%s", file),
+ Timestamp: reviewedAt,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelName: getString(in.Row, "accepted_model"),
+ ModelRole: distillation.RoleExecutor,
+ Text: getString(in.Row, "suggestions_preview"),
+ }
+ if file != "" {
+ rec.SourceFiles = []string{file}
+ }
+ if a, ok := numInt(in.Row, "accepted_on_attempt"); ok && a > 0 {
+ rec.SuccessMarkers = []string{fmt.Sprintf("accepted_on_attempt_%d", a)}
+ }
+ return &rec
+}
+
+// observerEscalationsTransform: reviewer-class trace; carries token
+// counts so the SFT exporter sees real usage signals.
+func observerEscalationsTransform(in TransformInput) *distillation.EvidenceRecord {
+ tsStr := getString(in.Row, "ts")
+ tsMs := timeToMS(tsStr)
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("obs_esc:%d:%s", tsMs, getString(in.Row, "sig_hash")),
+ TaskID: fmt.Sprintf("observer_escalation:%s", strDefault(in.Row, "cluster_endpoint", "?")),
+ Timestamp: tsStr,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleReviewer,
+ Text: getString(in.Row, "analysis"),
+ }
+ if pt, ok := numInt(in.Row, "prompt_tokens"); ok {
+ rec.PromptTokens = pt
+ }
+ if ct, ok := numInt(in.Row, "completion_tokens"); ok {
+ rec.CompletionTokens = ct
+ }
+ return &rec
+}
+
+// auditFactsTransform: per-PR auditor extraction. Text is a compact
+// JSON summary of array lengths (facts/entities/relationships).
+func auditFactsTransform(in TransformInput) *distillation.EvidenceRecord {
+ headSHA := getString(in.Row, "head_sha")
+ prNumber := getString(in.Row, "pr_number")
+ body, _ := json.Marshal(map[string]any{
+ "facts": arrayLen(in.Row, "facts"),
+ "entities": arrayLen(in.Row, "entities"),
+ "relationships": arrayLen(in.Row, "relationships"),
+ })
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("audit_facts:%s:%d", headSHA, in.LineOffset),
+ TaskID: fmt.Sprintf("pr:%s", prNumber),
+ Timestamp: getString(in.Row, "extracted_at"),
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelName: getString(in.Row, "extractor"),
+ ModelRole: distillation.RoleExtractor,
+ Text: string(body),
+ }
+ return &rec
+}
+
+// autoApplyTransform: applier traces. Pure metadata — no text payload.
+// Deterministic ts fallback to RecordedAt when the row lacks one
+// (matches TS comment about wall-clock leak fix).
+func autoApplyTransform(in TransformInput) *distillation.EvidenceRecord {
+ ts := getString(in.Row, "ts")
+ if ts == "" {
+ ts = in.RecordedAt
+ }
+ tsMs := timeToMS(ts)
+ action := strDefault(in.Row, "action", "unknown")
+ file := getString(in.Row, "file")
+ keySuffix := file
+ if keySuffix == "" {
+ keySuffix = fmt.Sprintf("%d", in.LineOffset)
+ }
+
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("auto_apply:%d:%s", tsMs, keySuffix),
+ TaskID: fmt.Sprintf("auto_apply:%s", strDefault(in.Row, "file", "?")),
+ Timestamp: ts,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleApplier,
+ }
+ if file != "" {
+ rec.SourceFiles = []string{file}
+ }
+ if action == "committed" {
+ rec.SuccessMarkers = []string{"committed"}
+ }
+ if strings.Contains(action, "reverted") {
+ rec.FailureMarkers = []string{action}
+ }
+ return &rec
+}
+
+// observerReviewsTransform: reviewer-class. Falls back from `ts` to
+// `reviewed_at`. Mirrors observer_escalations but carries verdict +
+// confidence + free-form notes.
+func observerReviewsTransform(in TransformInput) *distillation.EvidenceRecord {
+ ts := getString(in.Row, "ts")
+ if ts == "" {
+ ts = getString(in.Row, "reviewed_at")
+ }
+ tsMs := timeToMS(ts)
+ file := getString(in.Row, "file")
+
+ keySuffix := file
+ if keySuffix == "" {
+ keySuffix = fmt.Sprintf("%d", in.LineOffset)
+ }
+ taskID := fmt.Sprintf("observer_review:%s", keySuffix)
+ if file == "" {
+ taskID = fmt.Sprintf("observer_review:%d", in.LineOffset)
+ }
+
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("obs_rev:%d:%s", tsMs, keySuffix),
+ TaskID: taskID,
+ Timestamp: ts,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleReviewer,
+ }
+ if v, ok := in.Row["verdict"].(string); ok && v != "" {
+ rec.ObserverVerdict = distillation.ObserverVerdict(v)
+ }
+ if c, ok := numFloat(in.Row, "confidence"); ok {
+ rec.ObserverConfidence = c
+ }
+ if notes := flattenNotes(in.Row, "notes"); len(notes) > 0 {
+ rec.ObserverNotes = notes
+ }
+ if text := getString(in.Row, "notes"); text != "" {
+ rec.Text = text
+ } else if review := getString(in.Row, "review"); review != "" {
+ rec.Text = review
+ }
+ return &rec
+}
+
+// auditsTransform: per-finding auditor stream. Severity drives the
+// success/failure marker shape — info/low → success, medium →
+// non-fatal failure, high/critical → blocking failure.
+//
+// Note on determinism: the TS port falls back to `new Date().toISOString()`
+// when `ts` is missing, which is non-deterministic. The Go port uses
+// RecordedAt as the deterministic fallback (matches the
+// auto_apply fix pattern).
+func auditsTransform(in TransformInput) *distillation.EvidenceRecord {
+ sev := strings.ToLower(strDefault(in.Row, "severity", "unknown"))
+ minor := sev == "info" || sev == "low"
+ blocking := sev == "high" || sev == "critical"
+ medium := sev == "medium"
+
+ findingID := getString(in.Row, "finding_id")
+ keySuffix := findingID
+ if keySuffix == "" {
+ keySuffix = fmt.Sprintf("%d", in.LineOffset)
+ }
+ phase := getString(in.Row, "phase")
+ taskID := "audit_finding"
+ if phase != "" {
+ taskID = fmt.Sprintf("phase:%s", phase)
+ }
+
+ ts := getString(in.Row, "ts")
+ if ts == "" {
+ ts = in.RecordedAt
+ }
+
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("audit_finding:%s", keySuffix),
+ TaskID: taskID,
+ Timestamp: ts,
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleReviewer,
+ }
+ if minor {
+ rec.SuccessMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)}
+ }
+ if blocking {
+ rec.FailureMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)}
+ } else if medium {
+ rec.FailureMarkers = []string{"audit_severity_medium"}
+ }
+ if ev, ok := in.Row["evidence"].(string); ok && ev != "" {
+ rec.Text = ev
+ } else {
+ rec.Text = getString(in.Row, "resolution")
+ }
+ return &rec
+}
+
+// outcomesTransform: command-runner outcome stream. Latency from
+// elapsed_secs (× 1000), success when all events ok.
+func outcomesTransform(in TransformInput) *distillation.EvidenceRecord {
+ rec := distillation.EvidenceRecord{
+ RunID: fmt.Sprintf("outcome:%s", strDefault(in.Row, "run_id", fmt.Sprintf("%d", in.LineOffset))),
+ Timestamp: getString(in.Row, "created_at"),
+ SchemaVersion: distillation.EvidenceSchemaVersion,
+ Provenance: provenance(in),
+ ModelRole: distillation.RoleExecutor,
+ }
+ if sigHash := getString(in.Row, "sig_hash"); sigHash != "" {
+ rec.TaskID = fmt.Sprintf("outcome_sig:%s", sigHash)
+ } else {
+ rec.TaskID = fmt.Sprintf("outcome:%d", in.LineOffset)
+ }
+ if elapsed, ok := numFloat(in.Row, "elapsed_secs"); ok {
+ rec.LatencyMs = int64(elapsed*1000 + 0.5) // rounded
+ }
+ if okEv, ok1 := numInt(in.Row, "ok_events"); ok1 {
+ if total, ok2 := numInt(in.Row, "total_events"); ok2 {
+ if total > 0 && okEv == total {
+ rec.SuccessMarkers = []string{"all_events_ok"}
+ }
+ }
+ }
+ if g, ok := numInt(in.Row, "total_gap_signals"); ok {
+ vr := map[string]any{"gap_signals": g}
+ if c, ok2 := numInt(in.Row, "total_citations"); ok2 {
+ vr["citation_count"] = c
+ }
+ rec.ValidationResults = vr
+ }
+ return &rec
+}
+
+// ─── Helpers — coercion + extraction patterns shared by transforms ──
+
+func provenance(in TransformInput) distillation.Provenance {
+ return distillation.Provenance{
+ SourceFile: in.SourceFileRelPath,
+ LineOffset: in.LineOffset,
+ SigHash: in.SigHash,
+ RecordedAt: in.RecordedAt,
+ }
+}
+
+// stemFor extracts "distilled_facts" from "data/_kb/distilled_facts.jsonl".
+func stemFor(relpath string) string {
+ idx := strings.LastIndex(relpath, "/")
+ base := relpath
+ if idx >= 0 {
+ base = relpath[idx+1:]
+ }
+ return strings.TrimSuffix(base, ".jsonl")
+}
+
+// getString returns row[key] as a string, or "" if missing/wrong-type.
+func getString(row map[string]any, key string) string {
+ v, ok := row[key]
+ if !ok || v == nil {
+ return ""
+ }
+ switch t := v.(type) {
+ case string:
+ return t
+ case float64:
+ return fmt.Sprintf("%v", t)
+ case bool:
+ return fmt.Sprintf("%t", t)
+ default:
+ return fmt.Sprintf("%v", t)
+ }
+}
+
+// strDefault returns row[key] coerced to string, or fallback if empty/missing.
+func strDefault(row map[string]any, key, fallback string) string {
+ if s := getString(row, key); s != "" {
+ return s
+ }
+ return fallback
+}
+
+// numInt returns row[key] as int64. JSON numbers come in as float64.
+// Returns (val, true) when present and finite, else (0, false).
+func numInt(row map[string]any, key string) (int64, bool) {
+ v, ok := row[key]
+ if !ok || v == nil {
+ return 0, false
+ }
+ switch t := v.(type) {
+ case float64:
+ return int64(t), true
+ case int:
+ return int64(t), true
+ case int64:
+ return t, true
+ }
+ return 0, false
+}
+
+// numFloat returns row[key] as float64.
+func numFloat(row map[string]any, key string) (float64, bool) {
+ v, ok := row[key]
+ if !ok || v == nil {
+ return 0, false
+ }
+ switch t := v.(type) {
+ case float64:
+ return t, true
+ case int:
+ return float64(t), true
+ case int64:
+ return float64(t), true
+ }
+ return 0, false
+}
+
+// boolField returns (value, present). present=false when key missing
+// or non-bool.
+func boolField(row map[string]any, key string) (bool, bool) {
+ v, ok := row[key]
+ if !ok {
+ return false, false
+ }
+ if b, isBool := v.(bool); isBool {
+ return b, true
+ }
+ return false, false
+}
+
+// arrayLen returns len(row[key]) if it's an array, else 0.
+func arrayLen(row map[string]any, key string) int {
+ if a, ok := row[key].([]any); ok {
+ return len(a)
+ }
+ return 0
+}
+
+// objectKeys returns sorted keys of row[key] when it's a map. Returns
+// nil when missing or non-map (so callers can treat empty corpus list
+// as "field absent").
+func objectKeys(row map[string]any, key string) []string {
+ m, ok := row[key].(map[string]any)
+ if !ok || len(m) == 0 {
+ return nil
+ }
+ keys := make([]string, 0, len(m))
+ for k := range m {
+ keys = append(keys, k)
+ }
+ // Sort for determinism — TS Object.keys() order is insertion-order
+ // in modern engines but Go map iteration is randomized.
+ sortInPlace(keys)
+ return keys
+}
+
+// flattenNotes coerces row[key] from string OR []string into a clean
+// non-empty []string. TS form `[x].flat().filter(Boolean)` — Go does
+// it explicitly.
+func flattenNotes(row map[string]any, key string) []string {
+ v, ok := row[key]
+ if !ok || v == nil {
+ return nil
+ }
+ switch t := v.(type) {
+ case string:
+ if t == "" {
+ return nil
+ }
+ return []string{t}
+ case []any:
+ out := make([]string, 0, len(t))
+ for _, e := range t {
+ if s, ok := e.(string); ok && s != "" {
+ out = append(out, s)
+ }
+ }
+ if len(out) == 0 {
+ return nil
+ }
+ return out
+ }
+ return nil
+}
+
+// timeToMS parses an ISO 8601 string and returns milliseconds since
+// epoch, matching TS `new Date(iso).getTime()`. Returns 0 on parse
+// failure (matches TS NaN coerced to 0 by Number(...) in run_id paths,
+// although there it'd produce "NaN" — the Go behavior is more useful).
+func timeToMS(iso string) int64 {
+ if iso == "" {
+ return 0
+ }
+ for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
+ if t, err := time.Parse(layout, iso); err == nil {
+ return t.UnixMilli()
+ }
+ }
+ return 0
+}
+
+// buildRetrievedContext assembles RetrievedContext from a flat map of
+// already-coerced fields. Returns nil when nothing meaningful is set,
+// so transforms can attach the field conditionally without wrapping
+// the call site.
+func buildRetrievedContext(fields map[string]any) *distillation.RetrievedContext {
+ rc := distillation.RetrievedContext{}
+ any := false
+ if v, ok := fields["matrix_corpora"].([]string); ok && len(v) > 0 {
+ rc.MatrixCorpora = v
+ any = true
+ }
+ if v, ok := numFromAny(fields["matrix_hits"]); ok {
+ rc.MatrixHits = int(v)
+ any = true
+ }
+ if v, ok := numFromAny(fields["matrix_chunks_kept"]); ok {
+ rc.MatrixChunksKept = int(v)
+ any = true
+ }
+ if v, ok := numFromAny(fields["matrix_chunks_dropped"]); ok {
+ rc.MatrixChunksDropped = int(v)
+ any = true
+ }
+ if v, ok := numFromAny(fields["pathway_fingerprints_seen"]); ok {
+ rc.PathwayFingerprintsSeen = int(v)
+ any = true
+ }
+ if !any {
+ return nil
+ }
+ return &rc
+}
+
+func numFromAny(v any) (float64, bool) {
+ if v == nil {
+ return 0, false
+ }
+ switch t := v.(type) {
+ case float64:
+ return t, true
+ case int:
+ return float64(t), true
+ case int64:
+ return float64(t), true
+ }
+ return 0, false
+}
+
+func sortInPlace(s []string) {
+ // Tiny insertion sort — corpus lists are typically <10 entries.
+ for i := 1; i < len(s); i++ {
+ for j := i; j > 0 && s[j-1] > s[j]; j-- {
+ s[j-1], s[j] = s[j], s[j-1]
+ }
+ }
+}
diff --git a/internal/materializer/transforms_test.go b/internal/materializer/transforms_test.go
new file mode 100644
index 0000000..77ab9cc
--- /dev/null
+++ b/internal/materializer/transforms_test.go
@@ -0,0 +1,287 @@
+package materializer
+
+import (
+ "encoding/json"
+ "testing"
+
+ "git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
+)
+
+const fixedRecordedAt = "2026-05-02T00:00:00Z"
+const fixedSigHash = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
+
+func ti(row map[string]any, source string, lineOffset int64) TransformInput {
+ return TransformInput{
+ Row: row,
+ LineOffset: lineOffset,
+ SourceFileRelPath: source,
+ RecordedAt: fixedRecordedAt,
+ SigHash: fixedSigHash,
+ }
+}
+
+func TestExtractorTransform_DistilledFacts(t *testing.T) {
+ in := ti(map[string]any{
+ "run_id": "run-1",
+ "source_label": "lab-3",
+ "created_at": "2026-04-01T00:00:00Z",
+ "extractor": "qwen3.5:latest",
+ "text": "Hello.",
+ }, "data/_kb/distilled_facts.jsonl", 0)
+ rec := extractorTransform(in)
+ if rec == nil {
+ t.Fatal("nil record")
+ }
+ if rec.RunID != "run-1" || rec.TaskID != "lab-3" {
+ t.Fatalf("ids: %+v", rec)
+ }
+ if rec.ModelRole != distillation.RoleExtractor {
+ t.Errorf("role=%v, want extractor", rec.ModelRole)
+ }
+ if rec.ModelProvider != "ollama" {
+ t.Errorf("provider=%q, want ollama", rec.ModelProvider)
+ }
+ if rec.Provenance.SigHash != fixedSigHash {
+ t.Errorf("provenance.sig_hash mismatch: %q", rec.Provenance.SigHash)
+ }
+ if rec.Text != "Hello." {
+ t.Errorf("text=%q", rec.Text)
+ }
+}
+
+func TestExtractorTransform_FallbackIDs(t *testing.T) {
+ in := ti(map[string]any{
+ "created_at": "2026-04-01T00:00:00Z",
+ "text": "row without ids",
+ }, "data/_kb/distilled_procedures.jsonl", 7)
+ rec := extractorTransform(in)
+ if rec.RunID != "distilled_procedures:7" || rec.TaskID != "distilled_procedures:7" {
+ t.Fatalf("fallback ids wrong: %+v", rec)
+ }
+}
+
+func TestContractAnalysesTransform_Fields(t *testing.T) {
+ in := ti(map[string]any{
+ "permit_id": "P-001",
+ "ts": "2026-04-26T12:00:00Z",
+ "matrix_corpora": map[string]any{"workers": 1, "candidates": 1},
+ "matrix_hits": 3.0,
+ "observer_notes": []any{"good", "spec match"},
+ "observer_verdict": "accept",
+ "observer_conf": 85.0,
+ "ok": true,
+ "cost": 2_500_000.0, // micro-units
+ "duration_ms": 1234.0,
+ "contractor": "Acme",
+ "analysis": "Looks good.",
+ }, "data/_kb/contract_analyses.jsonl", 0)
+ rec := contractAnalysesTransform(in)
+ if rec.RunID == "" || rec.TaskID != "permit:P-001" {
+ t.Fatalf("ids: %+v", rec)
+ }
+ if rec.ModelRole != distillation.RoleExecutor {
+ t.Errorf("role=%v", rec.ModelRole)
+ }
+ if rec.RetrievedContext == nil || len(rec.RetrievedContext.MatrixCorpora) != 2 || rec.RetrievedContext.MatrixHits != 3 {
+ t.Errorf("retrieved_context wrong: %+v", rec.RetrievedContext)
+ }
+ if len(rec.ObserverNotes) != 2 {
+ t.Errorf("observer_notes=%v", rec.ObserverNotes)
+ }
+ if string(rec.ObserverVerdict) != "accept" || rec.ObserverConfidence != 85 {
+ t.Errorf("observer fields: %+v", rec)
+ }
+ if rec.CostUSD != 2.5 {
+ t.Errorf("cost should convert micro→USD; got %v", rec.CostUSD)
+ }
+ if rec.LatencyMs != 1234 {
+ t.Errorf("latency: %v", rec.LatencyMs)
+ }
+ if rec.Metadata == nil || rec.Metadata["contractor"] != "Acme" {
+ t.Errorf("metadata.contractor missing: %v", rec.Metadata)
+ }
+ if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "matrix_hits_above_threshold" {
+ t.Errorf("success_markers: %v", rec.SuccessMarkers)
+ }
+ if len(rec.FailureMarkers) != 0 {
+ t.Errorf("expected no failure_markers when ok=true and verdict=accept, got %v", rec.FailureMarkers)
+ }
+}
+
+func TestContractAnalysesTransform_FailureMarkers(t *testing.T) {
+ in := ti(map[string]any{
+ "permit_id": "P-002",
+ "ts": "2026-04-26T12:00:00Z",
+ "observer_verdict": "reject",
+ "ok": false,
+ "analysis": "Issues found.",
+ }, "data/_kb/contract_analyses.jsonl", 1)
+ rec := contractAnalysesTransform(in)
+ if len(rec.FailureMarkers) != 1 || rec.FailureMarkers[0] != "observer_rejected" {
+ t.Errorf("failure_markers: %v", rec.FailureMarkers)
+ }
+}
+
+func TestModeExperimentsTransform_ProviderInference(t *testing.T) {
+ openrouter := ti(map[string]any{
+ "ts": "2026-04-26T12:00:00Z",
+ "task_class": "scrum_review",
+ "model": "anthropic/claude-opus-4-7",
+ "file_path": "src/foo.rs",
+ "sources": map[string]any{"matrix_corpus": []any{"docs"}, "matrix_chunks_kept": 4.0},
+ "latency_ms": 200.0,
+ "response": "ok",
+ }, "data/_kb/mode_experiments.jsonl", 0)
+ rec := modeExperimentsTransform(openrouter)
+ if rec.ModelProvider != "openrouter" {
+ t.Errorf("provider=%q, want openrouter", rec.ModelProvider)
+ }
+
+ cloud := ti(map[string]any{
+ "ts": "2026-04-26T12:00:00Z",
+ "task_class": "scrum_review",
+ "model": "qwen3-coder:480b",
+ "sources": map[string]any{"matrix_corpus": []any{"docs"}},
+ "response": "ok",
+ }, "data/_kb/mode_experiments.jsonl", 1)
+ rec2 := modeExperimentsTransform(cloud)
+ if rec2.ModelProvider != "ollama_cloud" {
+ t.Errorf("provider=%q, want ollama_cloud", rec2.ModelProvider)
+ }
+ if len(rec2.SourceFiles) != 0 {
+ t.Errorf("source_files should be empty when file_path missing; got %v", rec2.SourceFiles)
+ }
+}
+
+func TestObserverEscalationsTransform_Tokens(t *testing.T) {
+ in := ti(map[string]any{
+ "ts": "2026-04-26T12:00:00Z",
+ "sig_hash": "abc",
+ "cluster_endpoint": "/v1/chat",
+ "prompt_tokens": 100.0,
+ "completion_tokens": 50.0,
+ "analysis": "review",
+ }, "data/_kb/observer_escalations.jsonl", 0)
+ rec := observerEscalationsTransform(in)
+ if rec.PromptTokens != 100 || rec.CompletionTokens != 50 {
+ t.Errorf("tokens: prompt=%d completion=%d", rec.PromptTokens, rec.CompletionTokens)
+ }
+ if rec.TaskID != "observer_escalation:/v1/chat" {
+ t.Errorf("task_id=%q", rec.TaskID)
+ }
+}
+
+func TestAuditFactsTransform_TextIsSummary(t *testing.T) {
+ in := ti(map[string]any{
+ "head_sha": "abc123",
+ "pr_number": 11.0,
+ "extracted_at": "2026-04-26T12:00:00Z",
+ "extractor": "qwen2.5",
+ "facts": []any{"f1", "f2"},
+ "entities": []any{"e1"},
+ "relationships": []any{},
+ }, "data/_kb/audit_facts.jsonl", 0)
+ rec := auditFactsTransform(in)
+ var summary map[string]any
+ if err := json.Unmarshal([]byte(rec.Text), &summary); err != nil {
+ t.Fatalf("text not JSON: %v", err)
+ }
+ if summary["facts"].(float64) != 2 || summary["entities"].(float64) != 1 || summary["relationships"].(float64) != 0 {
+ t.Errorf("counts wrong: %+v", summary)
+ }
+}
+
+func TestAutoApplyTransform_DeterministicTimestampFallback(t *testing.T) {
+ in := ti(map[string]any{
+ "action": "committed",
+ "file": "src/x.rs",
+ }, "data/_kb/auto_apply.jsonl", 0)
+ rec := autoApplyTransform(in)
+ if rec.Timestamp != fixedRecordedAt {
+ t.Errorf("expected fallback to RecordedAt %q, got %q", fixedRecordedAt, rec.Timestamp)
+ }
+ if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "committed" {
+ t.Errorf("success_markers: %v", rec.SuccessMarkers)
+ }
+
+ revertedIn := ti(map[string]any{
+ "ts": "2026-04-26T12:00:00Z",
+ "action": "auto_reverted_after_test_fail",
+ "file": "src/x.rs",
+ }, "data/_kb/auto_apply.jsonl", 1)
+ rec2 := autoApplyTransform(revertedIn)
+ if len(rec2.FailureMarkers) != 1 || rec2.FailureMarkers[0] != "auto_reverted_after_test_fail" {
+ t.Errorf("failure_markers: %v", rec2.FailureMarkers)
+ }
+}
+
+func TestAuditsTransform_SeverityRouting(t *testing.T) {
+ cases := []struct {
+ sev string
+ success bool
+ blocking bool
+ medium bool
+ }{
+ {"info", true, false, false},
+ {"low", true, false, false},
+ {"medium", false, false, true},
+ {"high", false, true, false},
+ {"critical", false, true, false},
+ }
+ for _, c := range cases {
+ t.Run(c.sev, func(t *testing.T) {
+ in := ti(map[string]any{
+ "finding_id": "F-1",
+ "phase": "G2",
+ "severity": c.sev,
+ "ts": "2026-04-26T12:00:00Z",
+ "evidence": "details",
+ }, "data/_kb/audits.jsonl", 0)
+ rec := auditsTransform(in)
+ hasSuccess := len(rec.SuccessMarkers) > 0
+ hasFailure := len(rec.FailureMarkers) > 0
+ if hasSuccess != c.success {
+ t.Errorf("severity=%s success=%v wanted %v", c.sev, hasSuccess, c.success)
+ }
+ if hasFailure != (c.blocking || c.medium) {
+ t.Errorf("severity=%s failure=%v wanted %v", c.sev, hasFailure, c.blocking || c.medium)
+ }
+ })
+ }
+}
+
+func TestOutcomesTransform_LatencyAndSuccess(t *testing.T) {
+ in := ti(map[string]any{
+ "run_id": "r-1",
+ "created_at": "2026-04-26T12:00:00Z",
+ "sig_hash": "abc",
+ "elapsed_secs": 1.234,
+ "ok_events": 5.0,
+ "total_events": 5.0,
+ "total_gap_signals": 2.0,
+ "total_citations": 3.0,
+ }, "data/_kb/outcomes.jsonl", 0)
+ rec := outcomesTransform(in)
+ if rec.LatencyMs != 1234 {
+ t.Errorf("latency=%d", rec.LatencyMs)
+ }
+ if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "all_events_ok" {
+ t.Errorf("success: %v", rec.SuccessMarkers)
+ }
+ if g, ok := rec.ValidationResults["gap_signals"].(int64); !ok || g != 2 {
+ t.Errorf("gap_signals: %v", rec.ValidationResults)
+ }
+ if c, ok := rec.ValidationResults["citation_count"].(int64); !ok || c != 3 {
+ t.Errorf("citation_count: %v", rec.ValidationResults)
+ }
+}
+
+func TestTransformByPath_Found(t *testing.T) {
+ td := TransformByPath("data/_kb/distilled_facts.jsonl")
+ if td == nil {
+ t.Fatal("expected to find distilled_facts transform")
+ }
+ if TransformByPath("data/_kb/never_existed.jsonl") != nil {
+ t.Fatal("expected nil for unknown path")
+ }
+}
diff --git a/internal/materializer/validate.go b/internal/materializer/validate.go
new file mode 100644
index 0000000..c705b16
--- /dev/null
+++ b/internal/materializer/validate.go
@@ -0,0 +1,131 @@
+package materializer
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+ "time"
+
+ "git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
+)
+
+// ValidateEvidenceRecord ports validateEvidenceRecord from
+// auditor/schemas/distillation/evidence_record.ts. Returns nil on
+// success or a slice of human-readable error messages — the
+// materializer logs the slice into distillation_skips.jsonl so an
+// operator can see why a row was rejected without diff'ing logic.
+//
+// The validator is intentionally separate from
+// distillation.ValidateScoredRun: scoring runs and evidence records
+// have different shapes and the scorer's validator only covers the
+// scored-run side.
+func ValidateEvidenceRecord(r distillation.EvidenceRecord) []string {
+ var errs []string
+
+ if r.RunID == "" {
+ errs = append(errs, "run_id: must be non-empty")
+ }
+ if r.TaskID == "" {
+ errs = append(errs, "task_id: must be non-empty")
+ }
+ if !validISOTimestamp(r.Timestamp) {
+ errs = append(errs, fmt.Sprintf("timestamp: not a valid ISO 8601 timestamp: %s", trim(r.Timestamp, 60)))
+ }
+ if r.SchemaVersion != distillation.EvidenceSchemaVersion {
+ errs = append(errs, fmt.Sprintf("schema_version: expected %d, got %d", distillation.EvidenceSchemaVersion, r.SchemaVersion))
+ }
+ errs = append(errs, validateProvenanceFields(r.Provenance)...)
+
+ if r.ModelRole != "" && !isValidModelRole(r.ModelRole) {
+ errs = append(errs, fmt.Sprintf("model_role: must be a known role, got %q", r.ModelRole))
+ }
+ if r.InputHash != "" && !isHexSha256(r.InputHash) {
+ errs = append(errs, "input_hash: must be hex sha256 when present")
+ }
+ if r.OutputHash != "" && !isHexSha256(r.OutputHash) {
+ errs = append(errs, "output_hash: must be hex sha256 when present")
+ }
+ if r.ObserverConfidence < 0 || r.ObserverConfidence > 100 {
+ errs = append(errs, "observer_confidence: must be in [0, 100]")
+ }
+ if r.HumanOverride != nil {
+ if r.HumanOverride.Overrider == "" {
+ errs = append(errs, "human_override.overrider: must be non-empty")
+ }
+ if r.HumanOverride.Reason == "" {
+ errs = append(errs, "human_override.reason: must be non-empty")
+ }
+ if !validISOTimestamp(r.HumanOverride.OverriddenAt) {
+ errs = append(errs, "human_override.overridden_at: must be ISO 8601")
+ }
+ switch r.HumanOverride.Decision {
+ case "accept", "reject", "needs_review":
+ default:
+ errs = append(errs, "human_override.decision: must be accept|reject|needs_review")
+ }
+ }
+
+ if len(errs) == 0 {
+ return nil
+ }
+ return errs
+}
+
+func validateProvenanceFields(p distillation.Provenance) []string {
+ var errs []string
+ if p.SourceFile == "" {
+ errs = append(errs, "provenance.source_file: must be non-empty")
+ }
+ if !isHexSha256(p.SigHash) {
+ errs = append(errs, fmt.Sprintf("provenance.sig_hash: not a valid hex sha256: %s", trim(p.SigHash, 80)))
+ }
+ if !validISOTimestamp(p.RecordedAt) {
+ errs = append(errs, "provenance.recorded_at: must be ISO 8601")
+ }
+ return errs
+}
+
+var (
+ // Permissive ISO 8601 (matches TS regex):
+ // YYYY-MM-DDTHH:MM:SS(.fraction)?(Z|±HH:MM)?
+ isoTimestampRE = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?$`)
+ hexSha256RE = regexp.MustCompile(`^[0-9a-f]{64}$`)
+)
+
+func validISOTimestamp(s string) bool {
+ if s == "" {
+ return false
+ }
+ if !isoTimestampRE.MatchString(s) {
+ return false
+ }
+ // Belt-and-suspenders: confirm it's actually parseable too.
+ if _, err := time.Parse(time.RFC3339, s); err == nil {
+ return true
+ }
+ if _, err := time.Parse(time.RFC3339Nano, s); err == nil {
+ return true
+ }
+ return false
+}
+
+func isHexSha256(s string) bool {
+ return hexSha256RE.MatchString(s)
+}
+
+func isValidModelRole(role distillation.ModelRole) bool {
+ switch role {
+ case distillation.RoleExecutor, distillation.RoleReviewer, distillation.RoleExtractor,
+ distillation.RoleVerifier, distillation.RoleCategorizer, distillation.RoleTiebreaker,
+ distillation.RoleApplier, distillation.RoleEmbedder, distillation.RoleOther:
+ return true
+ }
+ return false
+}
+
+func trim(s string, n int) string {
+ if len(s) <= n {
+ return s
+ }
+ return strings.ReplaceAll(s[:n], "\n", " ")
+}
diff --git a/scripts/materializer_smoke.sh b/scripts/materializer_smoke.sh
new file mode 100755
index 0000000..b00ea23
--- /dev/null
+++ b/scripts/materializer_smoke.sh
@@ -0,0 +1,73 @@
+#!/usr/bin/env bash
+# materializer smoke — Go port of scripts/distillation/build_evidence_index.ts.
+# Validates that the materializer:
+# - Builds a minimal evidence partition from a synthetic source jsonl
+# - Skips bad-JSON rows into distillation_skips.jsonl
+# - Idempotently dedups identical rows on re-run (rows_deduped > 0)
+# - Honors --dry-run (no files written, exit 0)
+# - Emits a parseable receipt.json with validation_pass
+
+set -euo pipefail
+cd "$(dirname "$0")/.."
+
+export PATH="$PATH:/usr/local/go/bin"
+
+echo "[materializer-smoke] building bin/materializer..."
+go build -o bin/materializer ./cmd/materializer
+
+ROOT="$(mktemp -d)"
+trap 'rm -rf "$ROOT"' EXIT INT TERM
+
+mkdir -p "$ROOT/data/_kb"
+cat > "$ROOT/data/_kb/distilled_facts.jsonl" <<EOF
+{"run_id":"r1","source_label":"lab-a","created_at":"2026-04-26T00:00:00Z","extractor":"qwen3.5:latest","text":"first"}
+{"run_id":"r2","source_label":"lab-b","created_at":"2026-04-26T01:00:00Z","extractor":"qwen3.5:latest","text":"second"}
+not-json
+EOF
+
+cat > "$ROOT/data/_kb/observer_escalations.jsonl" <<EOF
+{"ts":"2026-04-26T12:00:00Z","sig_hash":"abc","cluster_endpoint":"/v1/chat","prompt_tokens":42,"completion_tokens":11,"analysis":"esc"}
+EOF
+
+echo "[materializer-smoke] dry-run probe"
+# Materializer exits 1 when validation_pass=false (bad-json row); set -e
+# would kill the script on that. Run with || true and inspect stdout.
+DRY_OUT="$(./bin/materializer -root "$ROOT" -dry-run 2>&1 || true)"
+echo "$DRY_OUT" | grep -q "DRY RUN" || { echo "expected DRY RUN marker: $DRY_OUT"; exit 1; }
+[ ! -d "$ROOT/data/evidence" ] || { echo "dry-run wrote evidence dir"; exit 1; }
+
+echo "[materializer-smoke] first run"
+# Same exit-1 path as dry-run when bad-json present; expect that.
+./bin/materializer -root "$ROOT" || true
+
+OUT_FACTS="$ROOT/data/evidence/$(date -u +'%Y/%m/%d')/distilled_facts.jsonl"
+OUT_OBS="$ROOT/data/evidence/$(date -u +'%Y/%m/%d')/observer_escalations.jsonl"
+SKIPS="$ROOT/data/_kb/distillation_skips.jsonl"
+
+[ -s "$OUT_FACTS" ] || { echo "expected $OUT_FACTS"; exit 1; }
+[ -s "$OUT_OBS" ] || { echo "expected $OUT_OBS"; exit 1; }
+[ -s "$SKIPS" ] || { echo "expected $SKIPS to capture bad-json row"; exit 1; }
+
+GOOD_ROWS=$(wc -l < "$OUT_FACTS")
+[ "$GOOD_ROWS" -eq 2 ] || { echo "expected 2 good rows in $OUT_FACTS, got $GOOD_ROWS"; exit 1; }
+
+# Receipt — find the most recent one and parse validation_pass.
+RECEIPT="$(find "$ROOT/reports/distillation" -name 'receipt.json' -print0 | xargs -0 ls -t | head -1)"
+[ -n "$RECEIPT" ] || { echo "no receipt produced"; exit 1; }
+grep -q '"validation_pass": false' "$RECEIPT" || {
+ echo "expected validation_pass=false (1 row was bad JSON):";
+ cat "$RECEIPT";
+ exit 1;
+}
+
+echo "[materializer-smoke] idempotent re-run"
+./bin/materializer -root "$ROOT" >/tmp/materializer_smoke_rerun.txt 2>&1 || true
+# Rerun should fail validation again (the bad-JSON row is still there)
+# but successful rows should have hit dedup not write.
+grep -q "dedup=2" /tmp/materializer_smoke_rerun.txt || {
+ echo "expected dedup=2 on rerun, got:";
+ cat /tmp/materializer_smoke_rerun.txt;
+ exit 1;
+}
+
+echo "[materializer-smoke] PASS"