root 55b8c76a8c distillation: audit-FULL pipeline port (phases 0/3/4) — cross-runtime metric parity verified
Ports the metric-collection passes from scripts/distillation/audit_full.ts.
The substrate that PRODUCES audit_baselines.jsonl entries — the
half OPEN #2 left as "deferred to next wave" after the read/write
substrate landed in ca142b9.

Phase coverage:
  Phase 0 (file presence)             ported
  Phase 1 (schema validators)         skipped (Go's `go test` covers it)
  Phase 2 (materializer dry-run)      deferred (Go materializer not yet ported)
  Phase 3 (scored-runs distribution)  ported
  Phase 4 (contamination firewall)    ported
  Phase 5 (receipts validation)       deferred (Go run-summary JSON not yet emitted)
  Phase 6 (replay sanity)             deferred (Go replay tool not ported)
  Phase 7 (run summary lineage)       deferred (same)

Cross-runtime parity verified end-to-end:
  Go-side audit-full against /home/profit/lakehouse produced
  metrics IDENTICAL to the last Rust-emitted audit_baselines.jsonl
  entry. All 8 ported metrics match byte-for-byte:
    p3_accepted=386, p3_partial=132, p3_rejected=57, p3_human=480,
    p4_sft_rows=353, p4_rag_rows=448, p4_pref_pairs=83, p4_total_quarantined=1325
  6/6 required checks pass on live data.

Components:
- internal/distillation/audit_full.go: PhaseCheck struct (mirrors
  Rust shape), PhaseCheckReport aggregation, RunAuditFull
  orchestrator, auditPhase0/3/4 implementations, FormatAuditFullReport
  Markdown writer.
- cmd/audit_full/main.go: CLI binary with -root, -out, -json,
  -append-baseline flags. Operators run "./bin/audit_full
  -append-baseline" to grow the longitudinal log alongside the
  Rust pipeline (entries are interchangeable — same envelope shape).
- 6 new tests: empty-root failure handling, full-fixture clean PASS
  (locks all 8 metrics + all 6 required checks), SFT firewall
  contamination detection, preference self-pair detection, sig_hash
  regex correctness (rejects wrong-length + uppercase), Markdown
  formatter smoke.

Live-data probe captured at reports/cutover/audit_full_go_vs_rust.md
(linked from reports/cutover/SUMMARY.md). Same shape as the
audit_baselines round-trip evidence — both Go-side ports of the
distillation surface are now validated against real Rust data, not
just fixtures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 01:30:23 -05:00

446 lines
15 KiB
Go

package distillation
// Audit-FULL pipeline — Go port of scripts/distillation/audit_full.ts
// (Rust legacy). Runs the metric-collection passes that produce
// audit_baselines.jsonl entries. Pure observability: never modifies
// pipeline data, only reads and tallies.
//
// Phase coverage in this port:
// - Phase 0 (file presence) ✓ ported
// - Phase 1 (schema validators) ✗ skipped — Go's `go test`
// equivalent runs as part of
// `just verify`, no need to
// re-invoke from here.
// - Phase 2 (materializer dry-run) ✗ deferred — depends on the
// Go-side materializer port
// (transforms + build_evidence
// _index) which isn't yet
// done. Surfaces as TODO.
// - Phase 3 (scored-runs distribution) ✓ ported
// - Phase 4 (contamination firewall) ✓ ported
// - Phase 5 (receipts validation) ✗ deferred — depends on the
// Go pipeline emitting
// run-summary JSON, not yet.
// - Phase 6 (replay sanity) ✗ deferred — Go-side replay
// tool not ported.
// - Phase 7 (run summary lineage) ✗ deferred — same.
//
// The phases that ARE ported are sufficient to produce the
// AuditBaseline metrics (p3_*, p4_*) that drift across runs. p2_*
// metrics will remain at zero until the materializer ports.
//
// Output: a structured PhaseCheckReport plus a Markdown summary.
// Operators run this from cmd/audit_full to validate a Go-side
// distillation pipeline run produced sane outputs.
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
)
// PhaseCheck is one observable check within a phase. Mirrors the
// Rust shape exactly — Markdown rendering uses the same column
// layout so cross-runtime diff'ing is meaningful.
type PhaseCheck struct {
Phase int `json:"phase"`
Name string `json:"name"`
Expected string `json:"expected"`
Actual string `json:"actual"`
Passed bool `json:"passed"`
Required bool `json:"required"` // false → informational only, doesn't fail audit
Notes []string `json:"notes,omitempty"`
}
// PhaseCheckReport is the aggregate result of one audit-full run.
// Metrics is the AuditBaseline-shape metric snapshot that the
// caller can pass to AppendBaseline to grow the longitudinal log.
type PhaseCheckReport struct {
Checks []PhaseCheck `json:"checks"`
Metrics map[string]int64 `json:"metrics"`
Failed int `json:"failed"` // count of REQUIRED checks that failed
Skipped int `json:"deferred_phases"` // phases not yet ported
GitHEAD string `json:"git_head,omitempty"`
}
// AuditFullOptions controls a single audit-full run. Root is the
// data dir (defaults to LH_DISTILL_ROOT or /home/profit/lakehouse
// to keep operators running both runtimes hitting the same paths).
type AuditFullOptions struct {
Root string
GitHEAD string // optional — caller resolves and passes through
}
// RunAuditFull orchestrates the ported phases (0, 3, 4) and
// returns the aggregated report. Each phase is independent; a
// phase that errors is recorded as a failed check rather than
// aborting the run, matching Rust's "always emit a report" stance.
func RunAuditFull(opts AuditFullOptions) PhaseCheckReport {
if opts.Root == "" {
if env := os.Getenv("LH_DISTILL_ROOT"); env != "" {
opts.Root = env
} else {
opts.Root = "/home/profit/lakehouse"
}
}
report := PhaseCheckReport{
Metrics: make(map[string]int64),
GitHEAD: opts.GitHEAD,
Skipped: 4, // phases 1, 2, 5, 6, 7 all skipped — see header comment
}
auditPhase0(opts.Root, &report)
auditPhase3(opts.Root, &report)
auditPhase4(opts.Root, &report)
for _, c := range report.Checks {
if c.Required && !c.Passed {
report.Failed++
}
}
return report
}
// ── Phase 0: file presence ─────────────────────────────────────────
func auditPhase0(root string, report *PhaseCheckReport) {
// The recon doc is Rust-specific (docs/recon/local-distillation-
// recon.md); a Go-side equivalent would live in the
// golangLAKEHOUSE repo. For audit-full's purposes, we treat its
// presence as informational rather than required when running
// against a non-Rust root.
reconPath := filepath.Join(root, "docs", "recon", "local-distillation-recon.md")
exists := fileExists(reconPath)
report.Checks = append(report.Checks, PhaseCheck{
Phase: 0, Name: "recon doc exists",
Expected: "docs/recon/local-distillation-recon.md present",
Actual: fmt.Sprintf("%v", exists),
Passed: exists, Required: false, // informational on Go-side runs
})
tier1 := []string{
"data/_kb/distilled_facts.jsonl",
"data/_kb/scrum_reviews.jsonl",
"data/_kb/audit_facts.jsonl",
"data/_kb/mode_experiments.jsonl",
}
missing := []string{}
for _, p := range tier1 {
if !fileExists(filepath.Join(root, p)) {
missing = append(missing, p)
}
}
notes := []string{}
if len(missing) > 0 {
notes = append(notes, "fresh-clone or post-rotation environment — Phase 2 will tally as rows_present=false; not a hard fail")
}
report.Checks = append(report.Checks, PhaseCheck{
Phase: 0, Name: "tier-1 source streams present",
Expected: "all 4 tier-1 jsonls on disk",
Actual: func() string {
if len(missing) == 0 {
return "all present"
}
return "missing: " + strings.Join(missing, ", ")
}(),
Passed: len(missing) == 0, Required: false,
Notes: notes,
})
}
// ── Phase 3: scored-runs distribution ──────────────────────────────
func auditPhase3(root string, report *PhaseCheckReport) {
scoredDir := filepath.Join(root, "data", "scored-runs")
if !fileExists(scoredDir) {
report.Checks = append(report.Checks, PhaseCheck{
Phase: 3, Name: "scored-runs on disk",
Expected: "data/scored-runs/ populated",
Actual: "missing",
Passed: false, Required: true,
Notes: []string{"run scoring before audit-full (Go: scripts/distillation/score; Rust: ./scripts/distill score)"},
})
return
}
counts := map[string]int64{
"accepted": 0,
"partially_accepted": 0,
"rejected": 0,
"needs_human_review": 0,
}
files, err := ListScoredRunFiles(root)
if err != nil {
report.Checks = append(report.Checks, PhaseCheck{
Phase: 3, Name: "scored-runs walk",
Expected: "no error", Actual: err.Error(),
Passed: false, Required: true,
})
return
}
for _, f := range files {
runs, _, err := LoadScoredRunsFromFile(f)
if err != nil {
continue
}
for _, r := range runs {
if _, ok := counts[string(r.Category)]; ok {
counts[string(r.Category)]++
}
}
}
total := counts["accepted"] + counts["partially_accepted"] + counts["rejected"] + counts["needs_human_review"]
report.Metrics["p3_accepted"] = counts["accepted"]
report.Metrics["p3_partial"] = counts["partially_accepted"]
report.Metrics["p3_rejected"] = counts["rejected"]
report.Metrics["p3_human"] = counts["needs_human_review"]
report.Checks = append(report.Checks, PhaseCheck{
Phase: 3, Name: "on-disk scored-runs distribution non-empty",
Expected: ">=1 accepted",
Actual: fmt.Sprintf("acc=%d part=%d rej=%d hum=%d", counts["accepted"], counts["partially_accepted"], counts["rejected"], counts["needs_human_review"]),
Passed: counts["accepted"] >= 1, Required: true,
})
report.Checks = append(report.Checks, PhaseCheck{
Phase: 3, Name: "scored-runs distribution sums positive",
Expected: ">0 total", Actual: fmt.Sprintf("%d total", total),
Passed: total > 0, Required: false,
})
}
// ── Phase 4: contamination firewall + provenance ───────────────────
// sigHashRe pre-compiled match for the canonical sig_hash shape:
// 64 lowercase hex characters (sha256 hex). Used per-row in the
// provenance check.
var sigHashRe = regexp.MustCompile(`^[0-9a-f]{64}$`)
func auditPhase4(root string, report *PhaseCheckReport) {
sftPath := filepath.Join(root, "exports", "sft", "instruction_response.jsonl")
ragPath := filepath.Join(root, "exports", "rag", "playbooks.jsonl")
prefPath := filepath.Join(root, "exports", "preference", "chosen_rejected.jsonl")
sftRows := readJSONLLines(sftPath)
ragRows := readJSONLLines(ragPath)
prefRows := readJSONLLines(prefPath)
report.Metrics["p4_sft_rows"] = int64(len(sftRows))
report.Metrics["p4_rag_rows"] = int64(len(ragRows))
report.Metrics["p4_pref_pairs"] = int64(len(prefRows))
// SFT contamination firewall: 0 forbidden quality_scores. The
// only legal SFT quality scores are accepted + partially_accepted.
sftForbidden := 0
for _, line := range sftRows {
var r struct {
QualityScore string `json:"quality_score"`
}
if err := json.Unmarshal([]byte(line), &r); err != nil {
continue // tolerate malformed (matches Rust)
}
if r.QualityScore != "accepted" && r.QualityScore != "partially_accepted" {
sftForbidden++
}
}
report.Checks = append(report.Checks, PhaseCheck{
Phase: 4, Name: "SFT contamination firewall: 0 forbidden quality_scores",
Expected: "0", Actual: fmt.Sprintf("%d", sftForbidden),
Passed: sftForbidden == 0, Required: true,
Notes: []string{"this is the spec non-negotiable — rejected/needs_human_review must NEVER appear in SFT"},
})
// RAG firewall: 0 rejected leaks
ragRejected := 0
for _, line := range ragRows {
var r struct {
SuccessScore string `json:"success_score"`
}
if err := json.Unmarshal([]byte(line), &r); err != nil {
continue
}
if r.SuccessScore == "rejected" {
ragRejected++
}
}
report.Checks = append(report.Checks, PhaseCheck{
Phase: 4, Name: "RAG firewall: 0 rejected leaks",
Expected: "0", Actual: fmt.Sprintf("%d", ragRejected),
Passed: ragRejected == 0, Required: true,
})
// Preference: 0 self-pairs + 0 identical-text pairs.
prefSelfPairs, prefIdenticalText := 0, 0
for _, line := range prefRows {
var r struct {
ChosenRunID string `json:"chosen_run_id"`
RejectedRunID string `json:"rejected_run_id"`
Chosen string `json:"chosen"`
Rejected string `json:"rejected"`
}
if err := json.Unmarshal([]byte(line), &r); err != nil {
continue
}
if r.ChosenRunID == r.RejectedRunID {
prefSelfPairs++
}
if r.Chosen == r.Rejected {
prefIdenticalText++
}
}
report.Checks = append(report.Checks, PhaseCheck{
Phase: 4, Name: "Preference: 0 self-pairs (chosen_run_id != rejected_run_id)",
Expected: "0", Actual: fmt.Sprintf("%d", prefSelfPairs),
Passed: prefSelfPairs == 0, Required: true,
})
report.Checks = append(report.Checks, PhaseCheck{
Phase: 4, Name: "Preference: 0 identical-text pairs",
Expected: "0", Actual: fmt.Sprintf("%d", prefIdenticalText),
Passed: prefIdenticalText == 0, Required: true,
})
// Provenance check: every export row must carry a 64-char hex
// sig_hash. Walks sft + rag + pref together since the contract
// is uniform across all three.
noProv := 0
checkProv := func(line string) {
var r struct {
Provenance struct {
SigHash string `json:"sig_hash"`
} `json:"provenance"`
}
if err := json.Unmarshal([]byte(line), &r); err != nil {
return
}
if r.Provenance.SigHash == "" || !sigHashRe.MatchString(r.Provenance.SigHash) {
noProv++
}
}
for _, line := range sftRows {
checkProv(line)
}
for _, line := range ragRows {
checkProv(line)
}
for _, line := range prefRows {
checkProv(line)
}
report.Checks = append(report.Checks, PhaseCheck{
Phase: 4, Name: "every export row carries valid sha256 provenance.sig_hash",
Expected: "0 missing", Actual: fmt.Sprintf("%d missing", noProv),
Passed: noProv == 0, Required: true,
})
// Quarantine totals (informational — feeds the p4_total_quarantined
// metric used by the longitudinal drift signal).
totalQuar := int64(0)
for _, qp := range []string{
"exports/quarantine/sft.jsonl",
"exports/quarantine/rag.jsonl",
"exports/quarantine/preference.jsonl",
} {
totalQuar += int64(len(readJSONLLines(filepath.Join(root, qp))))
}
report.Metrics["p4_total_quarantined"] = totalQuar
}
// ── helpers ────────────────────────────────────────────────────────
func fileExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}
// readJSONLLines reads a JSONL file and returns non-empty lines.
// Returns nil on missing file (matches Rust's existsSync ? read : []).
func readJSONLLines(path string) []string {
data, err := os.ReadFile(path)
if err != nil {
return nil
}
out := make([]string, 0)
for _, line := range strings.Split(string(data), "\n") {
if strings.TrimSpace(line) != "" {
out = append(out, line)
}
}
return out
}
// FormatAuditFullReport renders a Markdown report mirroring the
// Rust phase8-full-audit-report.md shape so operators reading
// across runtimes don't have to re-learn the layout.
func FormatAuditFullReport(report PhaseCheckReport) string {
var b strings.Builder
fmt.Fprintln(&b, "# Audit-FULL report (Go)")
fmt.Fprintln(&b)
if report.GitHEAD != "" {
fmt.Fprintf(&b, "**git HEAD:** `%s`\n\n", report.GitHEAD)
}
failed := report.Failed
total := 0
for _, c := range report.Checks {
if c.Required {
total++
}
}
verdict := "PASS"
if failed > 0 {
verdict = "FAIL"
}
fmt.Fprintf(&b, "**Verdict:** %s — %d/%d required checks passed; %d phase(s) deferred.\n\n",
verdict, total-failed, total, report.Skipped)
fmt.Fprintln(&b, "## Checks")
fmt.Fprintln(&b)
fmt.Fprintln(&b, "| phase | name | expected | actual | required | passed |")
fmt.Fprintln(&b, "|---|---|---|---|---|---|")
for _, c := range report.Checks {
req := "no"
if c.Required {
req = "**yes**"
}
passed := "✗"
if c.Passed {
passed = "✓"
}
fmt.Fprintf(&b, "| %d | %s | %s | %s | %s | %s |\n",
c.Phase, c.Name, c.Expected, c.Actual, req, passed)
for _, n := range c.Notes {
fmt.Fprintf(&b, "| | _note_ | %s | | | |\n", n)
}
}
if len(report.Metrics) > 0 {
fmt.Fprintln(&b)
fmt.Fprintln(&b, "## Metrics")
fmt.Fprintln(&b)
fmt.Fprintln(&b, "| metric | value |")
fmt.Fprintln(&b, "|---|---:|")
// Stable order for diffs.
names := make([]string, 0, len(report.Metrics))
for k := range report.Metrics {
names = append(names, k)
}
// sort imported via audit_baseline.go
sortStrings(names)
for _, k := range names {
fmt.Fprintf(&b, "| %s | %d |\n", k, report.Metrics[k])
}
}
return b.String()
}
// sortStrings is the local sort wrapper to keep imports tidy across
// audit_baseline.go and audit_full.go (both need string sorting;
// importing sort once at the package level is cleaner).
func sortStrings(s []string) {
// Insertion sort — N is at most a dozen metric names.
for i := 1; i < len(s); i++ {
for j := i; j > 0 && s[j-1] > s[j]; j-- {
s[j-1], s[j] = s[j], s[j-1]
}
}
}