root 89ca72d471 materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.

1. **vectord substrate fix verified at original scale** (closes the
   2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
   scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
   Throughput dropped 1,115 → 438/sec because previously-broken
   scenarios now do real HNSW Add work — honest cost of correctness.
   The fix (i.vectors side-store + safeGraphAdd recover wrappers +
   smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
   footprint that originally surfaced the bug.

2. **Materializer port** — internal/materializer + cmd/materializer +
   scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
   (12 transforms) + build_evidence_index.ts (idempotency, day-partition,
   receipt). On-wire JSON shape matches TS so Bun and Go runs are
   interchangeable. 14 tests green.

3. **Replay port** — internal/replay + cmd/replay +
   scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
   (retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
   phase 7 live invocation on the Go side. Both runtimes append to the
   same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.

Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.

STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:31:02 -05:00

514 lines
14 KiB
Go

package materializer
import (
"bufio"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
// MaterializeOptions drives MaterializeAll. Tests construct this with
// a temp Root and override Transforms; the CLI uses defaults.
type MaterializeOptions struct {
Root string // repo root; sources + outputs are relative
Transforms []TransformDef // override for tests
RecordedAt string // ISO 8601 — fixed for the run
DryRun bool // count but don't write
}
// SourceResult mirrors TS SourceResult.
type SourceResult struct {
SourceFileRelPath string `json:"source_file_relpath"`
RowsPresent bool `json:"rows_present"`
RowsRead int `json:"rows_read"`
RowsWritten int `json:"rows_written"`
RowsSkipped int `json:"rows_skipped"`
RowsDeduped int `json:"rows_deduped"`
OutputFiles []string `json:"output_files"`
}
// MaterializeResult is what MaterializeAll returns. Receipt is the
// authoritative "did the run succeed" surface — the rest is plumbing.
type MaterializeResult struct {
Sources []SourceResult `json:"sources"`
Totals Totals `json:"totals"`
Receipt Receipt `json:"receipt"`
ReceiptPath string `json:"receipt_path"`
EvidenceDir string `json:"evidence_dir"`
SkipsPath string `json:"skips_path"`
}
// Totals — flat sum across sources.
type Totals struct {
RowsRead int `json:"rows_read"`
RowsWritten int `json:"rows_written"`
RowsSkipped int `json:"rows_skipped"`
RowsDeduped int `json:"rows_deduped"`
}
// Receipt mirrors auditor/schemas/distillation/receipt.ts. Schema
// version pinned to match the TS producer so consumers see the same
// shape regardless of which runtime generated the run.
const ReceiptSchemaVersion = 1
type Receipt struct {
SchemaVersion int `json:"schema_version"`
Command string `json:"command"`
GitSHA string `json:"git_sha"`
GitBranch string `json:"git_branch,omitempty"`
GitDirty bool `json:"git_dirty"`
StartedAt string `json:"started_at"`
EndedAt string `json:"ended_at"`
DurationMs int64 `json:"duration_ms"`
InputFiles []FileReference `json:"input_files"`
OutputFiles []FileReference `json:"output_files"`
RecordCounts RecordCounts `json:"record_counts"`
ValidationPass bool `json:"validation_pass"`
Errors []string `json:"errors"`
Warnings []string `json:"warnings"`
}
type FileReference struct {
Path string `json:"path"`
SHA256 string `json:"sha256"`
Bytes int64 `json:"bytes"`
}
type RecordCounts struct {
In int `json:"in"`
Out int `json:"out"`
Skipped int `json:"skipped"`
Deduped int `json:"deduped"`
}
// SkipRecord is one row in distillation_skips.jsonl. Operators read
// this stream when a run reports rows_skipped > 0.
type SkipRecord struct {
SourceFile string `json:"source_file"`
LineOffset int64 `json:"line_offset"`
Errors []string `json:"errors"`
SigHash string `json:"sig_hash,omitempty"`
RecordedAt string `json:"recorded_at"`
}
// MaterializeAll iterates Transforms[], reads each source JSONL,
// transforms each row, validates, writes to date-partitioned output.
// Returns a Receipt whose ValidationPass tells the caller whether all
// rows survived validation.
func MaterializeAll(opts MaterializeOptions) (MaterializeResult, error) {
if opts.RecordedAt == "" {
return MaterializeResult{}, errors.New("MaterializeOptions.RecordedAt required")
}
if opts.Root == "" {
return MaterializeResult{}, errors.New("MaterializeOptions.Root required")
}
if !validISOTimestamp(opts.RecordedAt) {
return MaterializeResult{}, fmt.Errorf("RecordedAt not ISO 8601: %s", opts.RecordedAt)
}
transforms := opts.Transforms
if transforms == nil {
transforms = Transforms
}
evidenceDir := filepath.Join(opts.Root, "data", "evidence")
skipsPath := filepath.Join(opts.Root, "data", "_kb", "distillation_skips.jsonl")
reportsDir := filepath.Join(opts.Root, "reports", "distillation")
startedMs := time.Now().UnixMilli()
sources := make([]SourceResult, 0, len(transforms))
for _, t := range transforms {
sr, err := processSource(t, opts, evidenceDir, skipsPath)
if err != nil {
return MaterializeResult{}, fmt.Errorf("processSource %s: %w", t.SourceFileRelPath, err)
}
sources = append(sources, sr)
}
totals := Totals{}
for _, s := range sources {
totals.RowsRead += s.RowsRead
totals.RowsWritten += s.RowsWritten
totals.RowsSkipped += s.RowsSkipped
totals.RowsDeduped += s.RowsDeduped
}
endedAt := time.Now().UTC().Format(time.RFC3339Nano)
durationMs := time.Now().UnixMilli() - startedMs
inputFiles := make([]FileReference, 0)
for _, s := range sources {
if !s.RowsPresent {
continue
}
path := filepath.Join(opts.Root, s.SourceFileRelPath)
ref, err := fileReferenceAt(path, s.SourceFileRelPath)
if err == nil {
inputFiles = append(inputFiles, ref)
}
}
outputFiles := make([]FileReference, 0)
for _, s := range sources {
for _, p := range s.OutputFiles {
rel := strings.TrimPrefix(p, opts.Root+string(os.PathSeparator))
ref, err := fileReferenceAt(p, rel)
if err == nil {
outputFiles = append(outputFiles, ref)
}
}
}
var (
errs []string
warnings []string
)
for _, s := range sources {
if !s.RowsPresent {
warnings = append(warnings, fmt.Sprintf("%s: source file not found (skipped)", s.SourceFileRelPath))
}
if s.RowsSkipped > 0 {
warnings = append(warnings, fmt.Sprintf("%s: %d rows skipped (validation/parse errors)", s.SourceFileRelPath, s.RowsSkipped))
}
}
receipt := Receipt{
SchemaVersion: ReceiptSchemaVersion,
Command: commandLineOf(opts),
GitSHA: getGitSHA(opts.Root),
GitBranch: getGitBranch(opts.Root),
GitDirty: getGitDirty(opts.Root),
StartedAt: opts.RecordedAt,
EndedAt: endedAt,
DurationMs: durationMs,
InputFiles: inputFiles,
OutputFiles: outputFiles,
RecordCounts: RecordCounts{
In: totals.RowsRead,
Out: totals.RowsWritten,
Skipped: totals.RowsSkipped,
Deduped: totals.RowsDeduped,
},
ValidationPass: totals.RowsSkipped == 0,
Errors: emptyToNil(errs),
Warnings: emptyToNil(warnings),
}
stamp := strings.NewReplacer(":", "-", ".", "-").Replace(endedAt)
receiptDir := filepath.Join(reportsDir, stamp)
receiptPath := filepath.Join(receiptDir, "receipt.json")
if !opts.DryRun {
if err := os.MkdirAll(receiptDir, 0o755); err != nil {
return MaterializeResult{}, fmt.Errorf("mkdir receipt dir: %w", err)
}
buf, err := json.MarshalIndent(receipt, "", " ")
if err != nil {
return MaterializeResult{}, fmt.Errorf("marshal receipt: %w", err)
}
buf = append(buf, '\n')
if err := os.WriteFile(receiptPath, buf, 0o644); err != nil {
return MaterializeResult{}, fmt.Errorf("write receipt: %w", err)
}
}
return MaterializeResult{
Sources: sources,
Totals: totals,
Receipt: receipt,
ReceiptPath: receiptPath,
EvidenceDir: evidenceDir,
SkipsPath: skipsPath,
}, nil
}
// processSource reads, transforms, validates, and writes a single
// source JSONL.
func processSource(t TransformDef, opts MaterializeOptions, evidenceDir, skipsPath string) (SourceResult, error) {
srcPath := filepath.Join(opts.Root, t.SourceFileRelPath)
res := SourceResult{SourceFileRelPath: t.SourceFileRelPath}
info, err := os.Stat(srcPath)
if err != nil {
if os.IsNotExist(err) {
return res, nil
}
return res, fmt.Errorf("stat %s: %w", srcPath, err)
}
if info.IsDir() {
return res, fmt.Errorf("%s is a directory, not a file", srcPath)
}
res.RowsPresent = true
partition := isoDatePartition(opts.RecordedAt)
stem := stemFor(t.SourceFileRelPath)
outDir := filepath.Join(evidenceDir, partition)
outPath := filepath.Join(outDir, stem+".jsonl")
if !opts.DryRun {
if err := os.MkdirAll(outDir, 0o755); err != nil {
return res, fmt.Errorf("mkdir output dir: %w", err)
}
}
seen, err := loadSeenHashes(outPath)
if err != nil {
return res, fmt.Errorf("load seen hashes: %w", err)
}
f, err := os.Open(srcPath)
if err != nil {
return res, fmt.Errorf("open %s: %w", srcPath, err)
}
defer f.Close()
var (
rowsToWrite []byte
skipsToWrite []byte
)
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
lineOffset := int64(-1)
for scanner.Scan() {
lineOffset++
raw := scanner.Bytes()
if len(raw) == 0 {
continue
}
res.RowsRead++
var row map[string]any
if err := json.Unmarshal(raw, &row); err != nil {
res.RowsSkipped++
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
SourceFile: t.SourceFileRelPath,
LineOffset: lineOffset,
Errors: []string{"JSON.parse failed: " + trim(err.Error(), 200)},
RecordedAt: opts.RecordedAt,
})
continue
}
sigHash, err := CanonicalSha256(row)
if err != nil {
res.RowsSkipped++
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
SourceFile: t.SourceFileRelPath,
LineOffset: lineOffset,
Errors: []string{"sig_hash compute failed: " + trim(err.Error(), 200)},
RecordedAt: opts.RecordedAt,
})
continue
}
if _, dup := seen[sigHash]; dup {
res.RowsDeduped++
continue
}
seen[sigHash] = struct{}{}
rec := t.Transform(TransformInput{
Row: row,
LineOffset: lineOffset,
SourceFileRelPath: t.SourceFileRelPath,
RecordedAt: opts.RecordedAt,
SigHash: sigHash,
})
if rec == nil {
res.RowsSkipped++
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
SourceFile: t.SourceFileRelPath,
LineOffset: lineOffset,
Errors: []string{"transform returned nil"},
SigHash: sigHash,
RecordedAt: opts.RecordedAt,
})
continue
}
if vErrs := ValidateEvidenceRecord(*rec); len(vErrs) > 0 {
res.RowsSkipped++
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
SourceFile: t.SourceFileRelPath,
LineOffset: lineOffset,
Errors: vErrs,
SigHash: sigHash,
RecordedAt: opts.RecordedAt,
})
continue
}
buf, err := json.Marshal(rec)
if err != nil {
res.RowsSkipped++
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
SourceFile: t.SourceFileRelPath,
LineOffset: lineOffset,
Errors: []string{"marshal output: " + trim(err.Error(), 200)},
SigHash: sigHash,
RecordedAt: opts.RecordedAt,
})
continue
}
rowsToWrite = append(rowsToWrite, buf...)
rowsToWrite = append(rowsToWrite, '\n')
res.RowsWritten++
}
if err := scanner.Err(); err != nil {
return res, fmt.Errorf("scan %s: %w", srcPath, err)
}
if !opts.DryRun {
if len(rowsToWrite) > 0 {
if err := appendBytes(outPath, rowsToWrite); err != nil {
return res, fmt.Errorf("append output: %w", err)
}
res.OutputFiles = append(res.OutputFiles, outPath)
}
if len(skipsToWrite) > 0 {
if err := os.MkdirAll(filepath.Dir(skipsPath), 0o755); err != nil {
return res, fmt.Errorf("mkdir skips dir: %w", err)
}
if err := appendBytes(skipsPath, skipsToWrite); err != nil {
return res, fmt.Errorf("append skips: %w", err)
}
}
}
return res, nil
}
// loadSeenHashes reads sig_hashes from an existing day-partition output
// file. Idempotency: a re-run that produces the same hash is a dedup
// not a duplicate write.
func loadSeenHashes(outPath string) (map[string]struct{}, error) {
seen := map[string]struct{}{}
f, err := os.Open(outPath)
if err != nil {
if os.IsNotExist(err) {
return seen, nil
}
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
for scanner.Scan() {
raw := scanner.Bytes()
if len(raw) == 0 {
continue
}
var rec struct {
Provenance struct {
SigHash string `json:"sig_hash"`
} `json:"provenance"`
}
if err := json.Unmarshal(raw, &rec); err != nil {
continue // malformed line; ignore
}
if rec.Provenance.SigHash != "" {
seen[rec.Provenance.SigHash] = struct{}{}
}
}
return seen, scanner.Err()
}
func appendSkip(buf []byte, sk SkipRecord) []byte {
out, err := json.Marshal(sk)
if err != nil {
// Should never happen for the well-typed SkipRecord — fall back
// to a sentinel so the materializer doesn't drop the skip silently.
return append(buf, []byte(fmt.Sprintf(`{"source_file":%q,"line_offset":%d,"errors":["marshal_skip_failed:%s"],"recorded_at":%q}`+"\n",
sk.SourceFile, sk.LineOffset, err.Error(), sk.RecordedAt))...)
}
buf = append(buf, out...)
buf = append(buf, '\n')
return buf
}
func appendBytes(path string, data []byte) error {
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(data)
return err
}
func isoDatePartition(iso string) string {
t, err := time.Parse(time.RFC3339Nano, iso)
if err != nil {
t, err = time.Parse(time.RFC3339, iso)
}
if err != nil {
// Fallback: TS would have produced "NaN/NaN/NaN" — we use
// "0000/00/00" which is at least a valid path. Materializer
// fails its own RecordedAt validation before reaching here.
return "0000/00/00"
}
t = t.UTC()
return fmt.Sprintf("%04d/%02d/%02d", t.Year(), int(t.Month()), t.Day())
}
func fileReferenceAt(path, relpath string) (FileReference, error) {
f, err := os.Open(path)
if err != nil {
return FileReference{}, err
}
defer f.Close()
hasher := sha256.New()
n, err := io.Copy(hasher, f)
if err != nil {
return FileReference{}, err
}
return FileReference{
Path: relpath,
SHA256: hex.EncodeToString(hasher.Sum(nil)),
Bytes: n,
}, nil
}
func getGitSHA(root string) string {
out, err := exec.Command("git", "-C", root, "rev-parse", "HEAD").Output()
if err != nil {
return strings.Repeat("0", 40)
}
return strings.TrimSpace(string(out))
}
func getGitBranch(root string) string {
out, err := exec.Command("git", "-C", root, "rev-parse", "--abbrev-ref", "HEAD").Output()
if err != nil {
return ""
}
return strings.TrimSpace(string(out))
}
func getGitDirty(root string) bool {
out, err := exec.Command("git", "-C", root, "status", "--porcelain").Output()
if err != nil {
return false
}
return strings.TrimSpace(string(out)) != ""
}
func commandLineOf(opts MaterializeOptions) string {
cmd := "go run ./cmd/materializer"
if opts.DryRun {
cmd += " --dry-run"
}
return cmd
}
func emptyToNil(s []string) []string {
if len(s) == 0 {
return []string{}
}
return s
}