package materializer import ( "bufio" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "strings" "time" ) // MaterializeOptions drives MaterializeAll. Tests construct this with // a temp Root and override Transforms; the CLI uses defaults. type MaterializeOptions struct { Root string // repo root; sources + outputs are relative Transforms []TransformDef // override for tests RecordedAt string // ISO 8601 — fixed for the run DryRun bool // count but don't write } // SourceResult mirrors TS SourceResult. type SourceResult struct { SourceFileRelPath string `json:"source_file_relpath"` RowsPresent bool `json:"rows_present"` RowsRead int `json:"rows_read"` RowsWritten int `json:"rows_written"` RowsSkipped int `json:"rows_skipped"` RowsDeduped int `json:"rows_deduped"` OutputFiles []string `json:"output_files"` } // MaterializeResult is what MaterializeAll returns. Receipt is the // authoritative "did the run succeed" surface — the rest is plumbing. type MaterializeResult struct { Sources []SourceResult `json:"sources"` Totals Totals `json:"totals"` Receipt Receipt `json:"receipt"` ReceiptPath string `json:"receipt_path"` EvidenceDir string `json:"evidence_dir"` SkipsPath string `json:"skips_path"` } // Totals — flat sum across sources. type Totals struct { RowsRead int `json:"rows_read"` RowsWritten int `json:"rows_written"` RowsSkipped int `json:"rows_skipped"` RowsDeduped int `json:"rows_deduped"` } // Receipt mirrors auditor/schemas/distillation/receipt.ts. Schema // version pinned to match the TS producer so consumers see the same // shape regardless of which runtime generated the run. const ReceiptSchemaVersion = 1 type Receipt struct { SchemaVersion int `json:"schema_version"` Command string `json:"command"` GitSHA string `json:"git_sha"` GitBranch string `json:"git_branch,omitempty"` GitDirty bool `json:"git_dirty"` StartedAt string `json:"started_at"` EndedAt string `json:"ended_at"` DurationMs int64 `json:"duration_ms"` InputFiles []FileReference `json:"input_files"` OutputFiles []FileReference `json:"output_files"` RecordCounts RecordCounts `json:"record_counts"` ValidationPass bool `json:"validation_pass"` Errors []string `json:"errors"` Warnings []string `json:"warnings"` } type FileReference struct { Path string `json:"path"` SHA256 string `json:"sha256"` Bytes int64 `json:"bytes"` } type RecordCounts struct { In int `json:"in"` Out int `json:"out"` Skipped int `json:"skipped"` Deduped int `json:"deduped"` } // SkipRecord is one row in distillation_skips.jsonl. Operators read // this stream when a run reports rows_skipped > 0. type SkipRecord struct { SourceFile string `json:"source_file"` LineOffset int64 `json:"line_offset"` Errors []string `json:"errors"` SigHash string `json:"sig_hash,omitempty"` RecordedAt string `json:"recorded_at"` } // MaterializeAll iterates Transforms[], reads each source JSONL, // transforms each row, validates, writes to date-partitioned output. // Returns a Receipt whose ValidationPass tells the caller whether all // rows survived validation. func MaterializeAll(opts MaterializeOptions) (MaterializeResult, error) { if opts.RecordedAt == "" { return MaterializeResult{}, errors.New("MaterializeOptions.RecordedAt required") } if opts.Root == "" { return MaterializeResult{}, errors.New("MaterializeOptions.Root required") } if !validISOTimestamp(opts.RecordedAt) { return MaterializeResult{}, fmt.Errorf("RecordedAt not ISO 8601: %s", opts.RecordedAt) } transforms := opts.Transforms if transforms == nil { transforms = Transforms } evidenceDir := filepath.Join(opts.Root, "data", "evidence") skipsPath := filepath.Join(opts.Root, "data", "_kb", "distillation_skips.jsonl") reportsDir := filepath.Join(opts.Root, "reports", "distillation") startedMs := time.Now().UnixMilli() sources := make([]SourceResult, 0, len(transforms)) for _, t := range transforms { sr, err := processSource(t, opts, evidenceDir, skipsPath) if err != nil { return MaterializeResult{}, fmt.Errorf("processSource %s: %w", t.SourceFileRelPath, err) } sources = append(sources, sr) } totals := Totals{} for _, s := range sources { totals.RowsRead += s.RowsRead totals.RowsWritten += s.RowsWritten totals.RowsSkipped += s.RowsSkipped totals.RowsDeduped += s.RowsDeduped } endedAt := time.Now().UTC().Format(time.RFC3339Nano) durationMs := time.Now().UnixMilli() - startedMs inputFiles := make([]FileReference, 0) for _, s := range sources { if !s.RowsPresent { continue } path := filepath.Join(opts.Root, s.SourceFileRelPath) ref, err := fileReferenceAt(path, s.SourceFileRelPath) if err == nil { inputFiles = append(inputFiles, ref) } } outputFiles := make([]FileReference, 0) for _, s := range sources { for _, p := range s.OutputFiles { rel := strings.TrimPrefix(p, opts.Root+string(os.PathSeparator)) ref, err := fileReferenceAt(p, rel) if err == nil { outputFiles = append(outputFiles, ref) } } } var ( errs []string warnings []string ) for _, s := range sources { if !s.RowsPresent { warnings = append(warnings, fmt.Sprintf("%s: source file not found (skipped)", s.SourceFileRelPath)) } if s.RowsSkipped > 0 { warnings = append(warnings, fmt.Sprintf("%s: %d rows skipped (validation/parse errors)", s.SourceFileRelPath, s.RowsSkipped)) } } receipt := Receipt{ SchemaVersion: ReceiptSchemaVersion, Command: commandLineOf(opts), GitSHA: getGitSHA(opts.Root), GitBranch: getGitBranch(opts.Root), GitDirty: getGitDirty(opts.Root), StartedAt: opts.RecordedAt, EndedAt: endedAt, DurationMs: durationMs, InputFiles: inputFiles, OutputFiles: outputFiles, RecordCounts: RecordCounts{ In: totals.RowsRead, Out: totals.RowsWritten, Skipped: totals.RowsSkipped, Deduped: totals.RowsDeduped, }, ValidationPass: totals.RowsSkipped == 0, Errors: emptyToNil(errs), Warnings: emptyToNil(warnings), } stamp := strings.NewReplacer(":", "-", ".", "-").Replace(endedAt) receiptDir := filepath.Join(reportsDir, stamp) receiptPath := filepath.Join(receiptDir, "receipt.json") if !opts.DryRun { if err := os.MkdirAll(receiptDir, 0o755); err != nil { return MaterializeResult{}, fmt.Errorf("mkdir receipt dir: %w", err) } buf, err := json.MarshalIndent(receipt, "", " ") if err != nil { return MaterializeResult{}, fmt.Errorf("marshal receipt: %w", err) } buf = append(buf, '\n') if err := os.WriteFile(receiptPath, buf, 0o644); err != nil { return MaterializeResult{}, fmt.Errorf("write receipt: %w", err) } } return MaterializeResult{ Sources: sources, Totals: totals, Receipt: receipt, ReceiptPath: receiptPath, EvidenceDir: evidenceDir, SkipsPath: skipsPath, }, nil } // processSource reads, transforms, validates, and writes a single // source JSONL. func processSource(t TransformDef, opts MaterializeOptions, evidenceDir, skipsPath string) (SourceResult, error) { srcPath := filepath.Join(opts.Root, t.SourceFileRelPath) res := SourceResult{SourceFileRelPath: t.SourceFileRelPath} info, err := os.Stat(srcPath) if err != nil { if os.IsNotExist(err) { return res, nil } return res, fmt.Errorf("stat %s: %w", srcPath, err) } if info.IsDir() { return res, fmt.Errorf("%s is a directory, not a file", srcPath) } res.RowsPresent = true partition := isoDatePartition(opts.RecordedAt) stem := stemFor(t.SourceFileRelPath) outDir := filepath.Join(evidenceDir, partition) outPath := filepath.Join(outDir, stem+".jsonl") if !opts.DryRun { if err := os.MkdirAll(outDir, 0o755); err != nil { return res, fmt.Errorf("mkdir output dir: %w", err) } } seen, err := loadSeenHashes(outPath) if err != nil { return res, fmt.Errorf("load seen hashes: %w", err) } f, err := os.Open(srcPath) if err != nil { return res, fmt.Errorf("open %s: %w", srcPath, err) } defer f.Close() var ( rowsToWrite []byte skipsToWrite []byte ) scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 0, 1<<16), 1<<24) lineOffset := int64(-1) for scanner.Scan() { lineOffset++ raw := scanner.Bytes() if len(raw) == 0 { continue } res.RowsRead++ var row map[string]any if err := json.Unmarshal(raw, &row); err != nil { res.RowsSkipped++ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{ SourceFile: t.SourceFileRelPath, LineOffset: lineOffset, Errors: []string{"JSON.parse failed: " + trim(err.Error(), 200)}, RecordedAt: opts.RecordedAt, }) continue } sigHash, err := CanonicalSha256(row) if err != nil { res.RowsSkipped++ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{ SourceFile: t.SourceFileRelPath, LineOffset: lineOffset, Errors: []string{"sig_hash compute failed: " + trim(err.Error(), 200)}, RecordedAt: opts.RecordedAt, }) continue } if _, dup := seen[sigHash]; dup { res.RowsDeduped++ continue } seen[sigHash] = struct{}{} rec := t.Transform(TransformInput{ Row: row, LineOffset: lineOffset, SourceFileRelPath: t.SourceFileRelPath, RecordedAt: opts.RecordedAt, SigHash: sigHash, }) if rec == nil { res.RowsSkipped++ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{ SourceFile: t.SourceFileRelPath, LineOffset: lineOffset, Errors: []string{"transform returned nil"}, SigHash: sigHash, RecordedAt: opts.RecordedAt, }) continue } if vErrs := ValidateEvidenceRecord(*rec); len(vErrs) > 0 { res.RowsSkipped++ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{ SourceFile: t.SourceFileRelPath, LineOffset: lineOffset, Errors: vErrs, SigHash: sigHash, RecordedAt: opts.RecordedAt, }) continue } buf, err := json.Marshal(rec) if err != nil { res.RowsSkipped++ skipsToWrite = appendSkip(skipsToWrite, SkipRecord{ SourceFile: t.SourceFileRelPath, LineOffset: lineOffset, Errors: []string{"marshal output: " + trim(err.Error(), 200)}, SigHash: sigHash, RecordedAt: opts.RecordedAt, }) continue } rowsToWrite = append(rowsToWrite, buf...) rowsToWrite = append(rowsToWrite, '\n') res.RowsWritten++ } if err := scanner.Err(); err != nil { return res, fmt.Errorf("scan %s: %w", srcPath, err) } if !opts.DryRun { if len(rowsToWrite) > 0 { if err := appendBytes(outPath, rowsToWrite); err != nil { return res, fmt.Errorf("append output: %w", err) } res.OutputFiles = append(res.OutputFiles, outPath) } if len(skipsToWrite) > 0 { if err := os.MkdirAll(filepath.Dir(skipsPath), 0o755); err != nil { return res, fmt.Errorf("mkdir skips dir: %w", err) } if err := appendBytes(skipsPath, skipsToWrite); err != nil { return res, fmt.Errorf("append skips: %w", err) } } } return res, nil } // loadSeenHashes reads sig_hashes from an existing day-partition output // file. Idempotency: a re-run that produces the same hash is a dedup // not a duplicate write. func loadSeenHashes(outPath string) (map[string]struct{}, error) { seen := map[string]struct{}{} f, err := os.Open(outPath) if err != nil { if os.IsNotExist(err) { return seen, nil } return nil, err } defer f.Close() scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 0, 1<<16), 1<<24) for scanner.Scan() { raw := scanner.Bytes() if len(raw) == 0 { continue } var rec struct { Provenance struct { SigHash string `json:"sig_hash"` } `json:"provenance"` } if err := json.Unmarshal(raw, &rec); err != nil { continue // malformed line; ignore } if rec.Provenance.SigHash != "" { seen[rec.Provenance.SigHash] = struct{}{} } } return seen, scanner.Err() } func appendSkip(buf []byte, sk SkipRecord) []byte { out, err := json.Marshal(sk) if err != nil { // Should never happen for the well-typed SkipRecord — fall back // to a sentinel so the materializer doesn't drop the skip silently. return append(buf, []byte(fmt.Sprintf(`{"source_file":%q,"line_offset":%d,"errors":["marshal_skip_failed:%s"],"recorded_at":%q}`+"\n", sk.SourceFile, sk.LineOffset, err.Error(), sk.RecordedAt))...) } buf = append(buf, out...) buf = append(buf, '\n') return buf } func appendBytes(path string, data []byte) error { f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return err } defer f.Close() _, err = f.Write(data) return err } func isoDatePartition(iso string) string { t, err := time.Parse(time.RFC3339Nano, iso) if err != nil { t, err = time.Parse(time.RFC3339, iso) } if err != nil { // Defense-in-depth only — MaterializeAll calls validISOTimestamp // on RecordedAt before processSource ever invokes us, so this // branch is unreachable through the public surface. Kept so a // future direct caller doesn't get a panic; "0000/00/00" is at // least a valid path. return "0000/00/00" } t = t.UTC() return fmt.Sprintf("%04d/%02d/%02d", t.Year(), int(t.Month()), t.Day()) } func fileReferenceAt(path, relpath string) (FileReference, error) { f, err := os.Open(path) if err != nil { return FileReference{}, err } defer f.Close() hasher := sha256.New() n, err := io.Copy(hasher, f) if err != nil { return FileReference{}, err } return FileReference{ Path: relpath, SHA256: hex.EncodeToString(hasher.Sum(nil)), Bytes: n, }, nil } func getGitSHA(root string) string { out, err := exec.Command("git", "-C", root, "rev-parse", "HEAD").Output() if err != nil { return strings.Repeat("0", 40) } return strings.TrimSpace(string(out)) } func getGitBranch(root string) string { out, err := exec.Command("git", "-C", root, "rev-parse", "--abbrev-ref", "HEAD").Output() if err != nil { return "" } return strings.TrimSpace(string(out)) } func getGitDirty(root string) bool { out, err := exec.Command("git", "-C", root, "status", "--porcelain").Output() if err != nil { return false } return strings.TrimSpace(string(out)) != "" } func commandLineOf(opts MaterializeOptions) string { cmd := "go run ./cmd/materializer" if opts.DryRun { cmd += " --dry-run" } return cmd } func emptyToNil(s []string) []string { if len(s) == 0 { return []string{} } return s }