Cross-lineage review of 89ca72d (Opus + Kimi + Qwen3-coder).
Convergent findings (≥2 reviewers): NONE.
- Kimi BLOCK (materializer main.go exits 0 on validation fail):
confabulation. Code does os.Exit(1) at lines 65-66.
- Qwen BLOCK (saveTask race condition): confabulation. All access
to inflight/pending is under s.mu.
- Qwen WARN (saveAfter nil deref): confabulation. Explicit
`if h.persist == nil { return }` guard at line 184.
- Opus BLOCK (TestSaveTask_Coalesces): self-withdrawn in same
response.
Opus WARNs actioned:
- Detached docstring on TestAdd_SmallIndex_ConcurrentDistinctIDs —
attached.
- isoDatePartition fallback comment — clarified as defense-in-depth
(MaterializeAll guards upstream; branch unreachable through public
surface).
Disposition + verdicts in reports/scrum/_evidence/2026-05-02/.
Pattern matches feedback_cross_lineage_review.md: only Opus emits
BLOCK-class findings worth verifying; Kimi/Qwen single-reviewer
BLOCKs failed trace verification.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
516 lines
14 KiB
Go
516 lines
14 KiB
Go
package materializer
|
|
|
|
import (
|
|
"bufio"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// MaterializeOptions drives MaterializeAll. Tests construct this with
|
|
// a temp Root and override Transforms; the CLI uses defaults.
|
|
type MaterializeOptions struct {
|
|
Root string // repo root; sources + outputs are relative
|
|
Transforms []TransformDef // override for tests
|
|
RecordedAt string // ISO 8601 — fixed for the run
|
|
DryRun bool // count but don't write
|
|
}
|
|
|
|
// SourceResult mirrors TS SourceResult.
|
|
type SourceResult struct {
|
|
SourceFileRelPath string `json:"source_file_relpath"`
|
|
RowsPresent bool `json:"rows_present"`
|
|
RowsRead int `json:"rows_read"`
|
|
RowsWritten int `json:"rows_written"`
|
|
RowsSkipped int `json:"rows_skipped"`
|
|
RowsDeduped int `json:"rows_deduped"`
|
|
OutputFiles []string `json:"output_files"`
|
|
}
|
|
|
|
// MaterializeResult is what MaterializeAll returns. Receipt is the
|
|
// authoritative "did the run succeed" surface — the rest is plumbing.
|
|
type MaterializeResult struct {
|
|
Sources []SourceResult `json:"sources"`
|
|
Totals Totals `json:"totals"`
|
|
Receipt Receipt `json:"receipt"`
|
|
ReceiptPath string `json:"receipt_path"`
|
|
EvidenceDir string `json:"evidence_dir"`
|
|
SkipsPath string `json:"skips_path"`
|
|
}
|
|
|
|
// Totals — flat sum across sources.
|
|
type Totals struct {
|
|
RowsRead int `json:"rows_read"`
|
|
RowsWritten int `json:"rows_written"`
|
|
RowsSkipped int `json:"rows_skipped"`
|
|
RowsDeduped int `json:"rows_deduped"`
|
|
}
|
|
|
|
// Receipt mirrors auditor/schemas/distillation/receipt.ts. Schema
|
|
// version pinned to match the TS producer so consumers see the same
|
|
// shape regardless of which runtime generated the run.
|
|
const ReceiptSchemaVersion = 1
|
|
|
|
type Receipt struct {
|
|
SchemaVersion int `json:"schema_version"`
|
|
Command string `json:"command"`
|
|
GitSHA string `json:"git_sha"`
|
|
GitBranch string `json:"git_branch,omitempty"`
|
|
GitDirty bool `json:"git_dirty"`
|
|
StartedAt string `json:"started_at"`
|
|
EndedAt string `json:"ended_at"`
|
|
DurationMs int64 `json:"duration_ms"`
|
|
InputFiles []FileReference `json:"input_files"`
|
|
OutputFiles []FileReference `json:"output_files"`
|
|
RecordCounts RecordCounts `json:"record_counts"`
|
|
ValidationPass bool `json:"validation_pass"`
|
|
Errors []string `json:"errors"`
|
|
Warnings []string `json:"warnings"`
|
|
}
|
|
|
|
type FileReference struct {
|
|
Path string `json:"path"`
|
|
SHA256 string `json:"sha256"`
|
|
Bytes int64 `json:"bytes"`
|
|
}
|
|
|
|
type RecordCounts struct {
|
|
In int `json:"in"`
|
|
Out int `json:"out"`
|
|
Skipped int `json:"skipped"`
|
|
Deduped int `json:"deduped"`
|
|
}
|
|
|
|
// SkipRecord is one row in distillation_skips.jsonl. Operators read
|
|
// this stream when a run reports rows_skipped > 0.
|
|
type SkipRecord struct {
|
|
SourceFile string `json:"source_file"`
|
|
LineOffset int64 `json:"line_offset"`
|
|
Errors []string `json:"errors"`
|
|
SigHash string `json:"sig_hash,omitempty"`
|
|
RecordedAt string `json:"recorded_at"`
|
|
}
|
|
|
|
// MaterializeAll iterates Transforms[], reads each source JSONL,
|
|
// transforms each row, validates, writes to date-partitioned output.
|
|
// Returns a Receipt whose ValidationPass tells the caller whether all
|
|
// rows survived validation.
|
|
func MaterializeAll(opts MaterializeOptions) (MaterializeResult, error) {
|
|
if opts.RecordedAt == "" {
|
|
return MaterializeResult{}, errors.New("MaterializeOptions.RecordedAt required")
|
|
}
|
|
if opts.Root == "" {
|
|
return MaterializeResult{}, errors.New("MaterializeOptions.Root required")
|
|
}
|
|
if !validISOTimestamp(opts.RecordedAt) {
|
|
return MaterializeResult{}, fmt.Errorf("RecordedAt not ISO 8601: %s", opts.RecordedAt)
|
|
}
|
|
transforms := opts.Transforms
|
|
if transforms == nil {
|
|
transforms = Transforms
|
|
}
|
|
|
|
evidenceDir := filepath.Join(opts.Root, "data", "evidence")
|
|
skipsPath := filepath.Join(opts.Root, "data", "_kb", "distillation_skips.jsonl")
|
|
reportsDir := filepath.Join(opts.Root, "reports", "distillation")
|
|
|
|
startedMs := time.Now().UnixMilli()
|
|
sources := make([]SourceResult, 0, len(transforms))
|
|
for _, t := range transforms {
|
|
sr, err := processSource(t, opts, evidenceDir, skipsPath)
|
|
if err != nil {
|
|
return MaterializeResult{}, fmt.Errorf("processSource %s: %w", t.SourceFileRelPath, err)
|
|
}
|
|
sources = append(sources, sr)
|
|
}
|
|
|
|
totals := Totals{}
|
|
for _, s := range sources {
|
|
totals.RowsRead += s.RowsRead
|
|
totals.RowsWritten += s.RowsWritten
|
|
totals.RowsSkipped += s.RowsSkipped
|
|
totals.RowsDeduped += s.RowsDeduped
|
|
}
|
|
|
|
endedAt := time.Now().UTC().Format(time.RFC3339Nano)
|
|
durationMs := time.Now().UnixMilli() - startedMs
|
|
|
|
inputFiles := make([]FileReference, 0)
|
|
for _, s := range sources {
|
|
if !s.RowsPresent {
|
|
continue
|
|
}
|
|
path := filepath.Join(opts.Root, s.SourceFileRelPath)
|
|
ref, err := fileReferenceAt(path, s.SourceFileRelPath)
|
|
if err == nil {
|
|
inputFiles = append(inputFiles, ref)
|
|
}
|
|
}
|
|
outputFiles := make([]FileReference, 0)
|
|
for _, s := range sources {
|
|
for _, p := range s.OutputFiles {
|
|
rel := strings.TrimPrefix(p, opts.Root+string(os.PathSeparator))
|
|
ref, err := fileReferenceAt(p, rel)
|
|
if err == nil {
|
|
outputFiles = append(outputFiles, ref)
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
errs []string
|
|
warnings []string
|
|
)
|
|
for _, s := range sources {
|
|
if !s.RowsPresent {
|
|
warnings = append(warnings, fmt.Sprintf("%s: source file not found (skipped)", s.SourceFileRelPath))
|
|
}
|
|
if s.RowsSkipped > 0 {
|
|
warnings = append(warnings, fmt.Sprintf("%s: %d rows skipped (validation/parse errors)", s.SourceFileRelPath, s.RowsSkipped))
|
|
}
|
|
}
|
|
|
|
receipt := Receipt{
|
|
SchemaVersion: ReceiptSchemaVersion,
|
|
Command: commandLineOf(opts),
|
|
GitSHA: getGitSHA(opts.Root),
|
|
GitBranch: getGitBranch(opts.Root),
|
|
GitDirty: getGitDirty(opts.Root),
|
|
StartedAt: opts.RecordedAt,
|
|
EndedAt: endedAt,
|
|
DurationMs: durationMs,
|
|
InputFiles: inputFiles,
|
|
OutputFiles: outputFiles,
|
|
RecordCounts: RecordCounts{
|
|
In: totals.RowsRead,
|
|
Out: totals.RowsWritten,
|
|
Skipped: totals.RowsSkipped,
|
|
Deduped: totals.RowsDeduped,
|
|
},
|
|
ValidationPass: totals.RowsSkipped == 0,
|
|
Errors: emptyToNil(errs),
|
|
Warnings: emptyToNil(warnings),
|
|
}
|
|
|
|
stamp := strings.NewReplacer(":", "-", ".", "-").Replace(endedAt)
|
|
receiptDir := filepath.Join(reportsDir, stamp)
|
|
receiptPath := filepath.Join(receiptDir, "receipt.json")
|
|
if !opts.DryRun {
|
|
if err := os.MkdirAll(receiptDir, 0o755); err != nil {
|
|
return MaterializeResult{}, fmt.Errorf("mkdir receipt dir: %w", err)
|
|
}
|
|
buf, err := json.MarshalIndent(receipt, "", " ")
|
|
if err != nil {
|
|
return MaterializeResult{}, fmt.Errorf("marshal receipt: %w", err)
|
|
}
|
|
buf = append(buf, '\n')
|
|
if err := os.WriteFile(receiptPath, buf, 0o644); err != nil {
|
|
return MaterializeResult{}, fmt.Errorf("write receipt: %w", err)
|
|
}
|
|
}
|
|
|
|
return MaterializeResult{
|
|
Sources: sources,
|
|
Totals: totals,
|
|
Receipt: receipt,
|
|
ReceiptPath: receiptPath,
|
|
EvidenceDir: evidenceDir,
|
|
SkipsPath: skipsPath,
|
|
}, nil
|
|
}
|
|
|
|
// processSource reads, transforms, validates, and writes a single
|
|
// source JSONL.
|
|
func processSource(t TransformDef, opts MaterializeOptions, evidenceDir, skipsPath string) (SourceResult, error) {
|
|
srcPath := filepath.Join(opts.Root, t.SourceFileRelPath)
|
|
res := SourceResult{SourceFileRelPath: t.SourceFileRelPath}
|
|
|
|
info, err := os.Stat(srcPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return res, nil
|
|
}
|
|
return res, fmt.Errorf("stat %s: %w", srcPath, err)
|
|
}
|
|
if info.IsDir() {
|
|
return res, fmt.Errorf("%s is a directory, not a file", srcPath)
|
|
}
|
|
res.RowsPresent = true
|
|
|
|
partition := isoDatePartition(opts.RecordedAt)
|
|
stem := stemFor(t.SourceFileRelPath)
|
|
outDir := filepath.Join(evidenceDir, partition)
|
|
outPath := filepath.Join(outDir, stem+".jsonl")
|
|
if !opts.DryRun {
|
|
if err := os.MkdirAll(outDir, 0o755); err != nil {
|
|
return res, fmt.Errorf("mkdir output dir: %w", err)
|
|
}
|
|
}
|
|
|
|
seen, err := loadSeenHashes(outPath)
|
|
if err != nil {
|
|
return res, fmt.Errorf("load seen hashes: %w", err)
|
|
}
|
|
|
|
f, err := os.Open(srcPath)
|
|
if err != nil {
|
|
return res, fmt.Errorf("open %s: %w", srcPath, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var (
|
|
rowsToWrite []byte
|
|
skipsToWrite []byte
|
|
)
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
|
|
lineOffset := int64(-1)
|
|
for scanner.Scan() {
|
|
lineOffset++
|
|
raw := scanner.Bytes()
|
|
if len(raw) == 0 {
|
|
continue
|
|
}
|
|
res.RowsRead++
|
|
|
|
var row map[string]any
|
|
if err := json.Unmarshal(raw, &row); err != nil {
|
|
res.RowsSkipped++
|
|
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
|
|
SourceFile: t.SourceFileRelPath,
|
|
LineOffset: lineOffset,
|
|
Errors: []string{"JSON.parse failed: " + trim(err.Error(), 200)},
|
|
RecordedAt: opts.RecordedAt,
|
|
})
|
|
continue
|
|
}
|
|
|
|
sigHash, err := CanonicalSha256(row)
|
|
if err != nil {
|
|
res.RowsSkipped++
|
|
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
|
|
SourceFile: t.SourceFileRelPath,
|
|
LineOffset: lineOffset,
|
|
Errors: []string{"sig_hash compute failed: " + trim(err.Error(), 200)},
|
|
RecordedAt: opts.RecordedAt,
|
|
})
|
|
continue
|
|
}
|
|
if _, dup := seen[sigHash]; dup {
|
|
res.RowsDeduped++
|
|
continue
|
|
}
|
|
seen[sigHash] = struct{}{}
|
|
|
|
rec := t.Transform(TransformInput{
|
|
Row: row,
|
|
LineOffset: lineOffset,
|
|
SourceFileRelPath: t.SourceFileRelPath,
|
|
RecordedAt: opts.RecordedAt,
|
|
SigHash: sigHash,
|
|
})
|
|
if rec == nil {
|
|
res.RowsSkipped++
|
|
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
|
|
SourceFile: t.SourceFileRelPath,
|
|
LineOffset: lineOffset,
|
|
Errors: []string{"transform returned nil"},
|
|
SigHash: sigHash,
|
|
RecordedAt: opts.RecordedAt,
|
|
})
|
|
continue
|
|
}
|
|
|
|
if vErrs := ValidateEvidenceRecord(*rec); len(vErrs) > 0 {
|
|
res.RowsSkipped++
|
|
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
|
|
SourceFile: t.SourceFileRelPath,
|
|
LineOffset: lineOffset,
|
|
Errors: vErrs,
|
|
SigHash: sigHash,
|
|
RecordedAt: opts.RecordedAt,
|
|
})
|
|
continue
|
|
}
|
|
|
|
buf, err := json.Marshal(rec)
|
|
if err != nil {
|
|
res.RowsSkipped++
|
|
skipsToWrite = appendSkip(skipsToWrite, SkipRecord{
|
|
SourceFile: t.SourceFileRelPath,
|
|
LineOffset: lineOffset,
|
|
Errors: []string{"marshal output: " + trim(err.Error(), 200)},
|
|
SigHash: sigHash,
|
|
RecordedAt: opts.RecordedAt,
|
|
})
|
|
continue
|
|
}
|
|
rowsToWrite = append(rowsToWrite, buf...)
|
|
rowsToWrite = append(rowsToWrite, '\n')
|
|
res.RowsWritten++
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return res, fmt.Errorf("scan %s: %w", srcPath, err)
|
|
}
|
|
|
|
if !opts.DryRun {
|
|
if len(rowsToWrite) > 0 {
|
|
if err := appendBytes(outPath, rowsToWrite); err != nil {
|
|
return res, fmt.Errorf("append output: %w", err)
|
|
}
|
|
res.OutputFiles = append(res.OutputFiles, outPath)
|
|
}
|
|
if len(skipsToWrite) > 0 {
|
|
if err := os.MkdirAll(filepath.Dir(skipsPath), 0o755); err != nil {
|
|
return res, fmt.Errorf("mkdir skips dir: %w", err)
|
|
}
|
|
if err := appendBytes(skipsPath, skipsToWrite); err != nil {
|
|
return res, fmt.Errorf("append skips: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// loadSeenHashes reads sig_hashes from an existing day-partition output
|
|
// file. Idempotency: a re-run that produces the same hash is a dedup
|
|
// not a duplicate write.
|
|
func loadSeenHashes(outPath string) (map[string]struct{}, error) {
|
|
seen := map[string]struct{}{}
|
|
f, err := os.Open(outPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return seen, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
scanner := bufio.NewScanner(f)
|
|
scanner.Buffer(make([]byte, 0, 1<<16), 1<<24)
|
|
for scanner.Scan() {
|
|
raw := scanner.Bytes()
|
|
if len(raw) == 0 {
|
|
continue
|
|
}
|
|
var rec struct {
|
|
Provenance struct {
|
|
SigHash string `json:"sig_hash"`
|
|
} `json:"provenance"`
|
|
}
|
|
if err := json.Unmarshal(raw, &rec); err != nil {
|
|
continue // malformed line; ignore
|
|
}
|
|
if rec.Provenance.SigHash != "" {
|
|
seen[rec.Provenance.SigHash] = struct{}{}
|
|
}
|
|
}
|
|
return seen, scanner.Err()
|
|
}
|
|
|
|
func appendSkip(buf []byte, sk SkipRecord) []byte {
|
|
out, err := json.Marshal(sk)
|
|
if err != nil {
|
|
// Should never happen for the well-typed SkipRecord — fall back
|
|
// to a sentinel so the materializer doesn't drop the skip silently.
|
|
return append(buf, []byte(fmt.Sprintf(`{"source_file":%q,"line_offset":%d,"errors":["marshal_skip_failed:%s"],"recorded_at":%q}`+"\n",
|
|
sk.SourceFile, sk.LineOffset, err.Error(), sk.RecordedAt))...)
|
|
}
|
|
buf = append(buf, out...)
|
|
buf = append(buf, '\n')
|
|
return buf
|
|
}
|
|
|
|
func appendBytes(path string, data []byte) error {
|
|
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
_, err = f.Write(data)
|
|
return err
|
|
}
|
|
|
|
func isoDatePartition(iso string) string {
|
|
t, err := time.Parse(time.RFC3339Nano, iso)
|
|
if err != nil {
|
|
t, err = time.Parse(time.RFC3339, iso)
|
|
}
|
|
if err != nil {
|
|
// Defense-in-depth only — MaterializeAll calls validISOTimestamp
|
|
// on RecordedAt before processSource ever invokes us, so this
|
|
// branch is unreachable through the public surface. Kept so a
|
|
// future direct caller doesn't get a panic; "0000/00/00" is at
|
|
// least a valid path.
|
|
return "0000/00/00"
|
|
}
|
|
t = t.UTC()
|
|
return fmt.Sprintf("%04d/%02d/%02d", t.Year(), int(t.Month()), t.Day())
|
|
}
|
|
|
|
func fileReferenceAt(path, relpath string) (FileReference, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return FileReference{}, err
|
|
}
|
|
defer f.Close()
|
|
hasher := sha256.New()
|
|
n, err := io.Copy(hasher, f)
|
|
if err != nil {
|
|
return FileReference{}, err
|
|
}
|
|
return FileReference{
|
|
Path: relpath,
|
|
SHA256: hex.EncodeToString(hasher.Sum(nil)),
|
|
Bytes: n,
|
|
}, nil
|
|
}
|
|
|
|
func getGitSHA(root string) string {
|
|
out, err := exec.Command("git", "-C", root, "rev-parse", "HEAD").Output()
|
|
if err != nil {
|
|
return strings.Repeat("0", 40)
|
|
}
|
|
return strings.TrimSpace(string(out))
|
|
}
|
|
|
|
func getGitBranch(root string) string {
|
|
out, err := exec.Command("git", "-C", root, "rev-parse", "--abbrev-ref", "HEAD").Output()
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(string(out))
|
|
}
|
|
|
|
func getGitDirty(root string) bool {
|
|
out, err := exec.Command("git", "-C", root, "status", "--porcelain").Output()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return strings.TrimSpace(string(out)) != ""
|
|
}
|
|
|
|
func commandLineOf(opts MaterializeOptions) string {
|
|
cmd := "go run ./cmd/materializer"
|
|
if opts.DryRun {
|
|
cmd += " --dry-run"
|
|
}
|
|
return cmd
|
|
}
|
|
|
|
func emptyToNil(s []string) []string {
|
|
if len(s) == 0 {
|
|
return []string{}
|
|
}
|
|
return s
|
|
}
|