package materializer import ( "encoding/json" "fmt" "strings" "time" "git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation" ) // TransformInput is what each TransformFn receives. Mirrors the TS // TransformInput shape — every field is supplied by the materializer // driver, not by the transform. type TransformInput struct { Row map[string]any LineOffset int64 SourceFileRelPath string // relative to repo root RecordedAt string // ISO 8601, caller's "now" SigHash string // canonical sha256 of row, pre-computed } // TransformFn maps a single source row to an EvidenceRecord. Returning // nil signals "skip this row" — the materializer logs a deterministic // skip with no record produced. // // Transforms must be pure: no I/O, no clock reads, no model calls. // Any time component must come from the row itself or RecordedAt. type TransformFn func(in TransformInput) *distillation.EvidenceRecord // TransformDef binds a source-file path to its TransformFn. Order in // Transforms[] has no effect (each runs against its own SourceFile). type TransformDef struct { SourceFileRelPath string Transform TransformFn } // ─── Transforms — one per source-file. Ports of TRANSFORMS[] in // scripts/distillation/transforms.ts. Tier 1 first (validated), Tier 2 // second (untested but in-shape). ──────────────────────────────────── // Transforms is the canonical list. CLI passes this to MaterializeAll. // Adding a new source: append a TransformDef. var Transforms = []TransformDef{ // ── Tier 1: validated 100% in Phase 1 ───────────────────────── {SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform}, {SourceFileRelPath: "data/_kb/distilled_procedures.jsonl", Transform: extractorTransform}, {SourceFileRelPath: "data/_kb/distilled_config_hints.jsonl", Transform: extractorTransform}, {SourceFileRelPath: "data/_kb/contract_analyses.jsonl", Transform: contractAnalysesTransform}, {SourceFileRelPath: "data/_kb/mode_experiments.jsonl", Transform: modeExperimentsTransform}, {SourceFileRelPath: "data/_kb/scrum_reviews.jsonl", Transform: scrumReviewsTransform}, {SourceFileRelPath: "data/_kb/observer_escalations.jsonl", Transform: observerEscalationsTransform}, {SourceFileRelPath: "data/_kb/audit_facts.jsonl", Transform: auditFactsTransform}, // ── Tier 2: untested streams that still belong in EvidenceRecord ── {SourceFileRelPath: "data/_kb/auto_apply.jsonl", Transform: autoApplyTransform}, {SourceFileRelPath: "data/_kb/observer_reviews.jsonl", Transform: observerReviewsTransform}, {SourceFileRelPath: "data/_kb/audits.jsonl", Transform: auditsTransform}, {SourceFileRelPath: "data/_kb/outcomes.jsonl", Transform: outcomesTransform}, } // TransformByPath returns the TransformDef for a given source path, // or nil if no transform is registered. Matches the TS helper. func TransformByPath(relpath string) *TransformDef { for i := range Transforms { if Transforms[i].SourceFileRelPath == relpath { return &Transforms[i] } } return nil } // ─── Per-source transform implementations ───────────────────────── // extractorTransform powers the three distilled_* sources. Same shape: // LLM-extracted text with a model_name from `extractor`. func extractorTransform(in TransformInput) *distillation.EvidenceRecord { stem := stemFor(in.SourceFileRelPath) rec := distillation.EvidenceRecord{ RunID: strDefault(in.Row, "run_id", fmt.Sprintf("%s:%d", stem, in.LineOffset)), TaskID: strDefault(in.Row, "source_label", fmt.Sprintf("%s:%d", stem, in.LineOffset)), Timestamp: getString(in.Row, "created_at"), SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelName: getString(in.Row, "extractor"), ModelRole: distillation.RoleExtractor, ModelProvider: "ollama", Text: getString(in.Row, "text"), } return &rec } // contractAnalysesTransform: per-permit executor with observer signals, // retrieval telemetry, and cost in micro-units that gets converted to // USD. Carries `contractor` in metadata. func contractAnalysesTransform(in TransformInput) *distillation.EvidenceRecord { permitID := getString(in.Row, "permit_id") tsStr := getString(in.Row, "ts") tsMs := timeToMS(tsStr) rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("contract_analysis:%s:%d", permitID, tsMs), TaskID: fmt.Sprintf("permit:%s", permitID), Timestamp: tsStr, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleExecutor, Text: getString(in.Row, "analysis"), } if rc := buildRetrievedContext(map[string]any{ "matrix_corpora": objectKeys(in.Row, "matrix_corpora"), "matrix_hits": in.Row["matrix_hits"], }); rc != nil { rec.RetrievedContext = rc } if notes := flattenNotes(in.Row, "observer_notes"); len(notes) > 0 { rec.ObserverNotes = notes } if v, ok := in.Row["observer_verdict"].(string); ok && v != "" { rec.ObserverVerdict = distillation.ObserverVerdict(v) } if c, ok := numFloat(in.Row, "observer_conf"); ok { rec.ObserverConfidence = c } if ok, present := boolField(in.Row, "ok"); present && ok { rec.SuccessMarkers = []string{"matrix_hits_above_threshold"} } verdict := getString(in.Row, "observer_verdict") okPresent, _ := boolField(in.Row, "ok") if !okPresent || verdict == "reject" { rec.FailureMarkers = []string{"observer_rejected"} } if cost, ok := numFloat(in.Row, "cost"); ok { rec.CostUSD = cost / 1_000_000.0 } if d, ok := numInt(in.Row, "duration_ms"); ok { rec.LatencyMs = d } if contractor := getString(in.Row, "contractor"); contractor != "" { rec.Metadata = map[string]any{"contractor": contractor} } return &rec } // modeExperimentsTransform: mode_runner per-call traces. Provider // derived from model name shape ("/" → openrouter, else ollama_cloud). func modeExperimentsTransform(in TransformInput) *distillation.EvidenceRecord { tsStr := getString(in.Row, "ts") tsMs := timeToMS(tsStr) filePath := getString(in.Row, "file_path") keySuffix := filePath if keySuffix == "" { keySuffix = fmt.Sprintf("%d", in.LineOffset) } model := getString(in.Row, "model") provider := "ollama_cloud" if strings.Contains(model, "/") { provider = "openrouter" } rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("mode_exec:%d:%s", tsMs, keySuffix), TaskID: getString(in.Row, "task_class"), Timestamp: tsStr, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelName: model, ModelRole: distillation.RoleExecutor, ModelProvider: provider, Text: getString(in.Row, "response"), } if d, ok := numInt(in.Row, "latency_ms"); ok { rec.LatencyMs = d } if filePath != "" { rec.SourceFiles = []string{filePath} } if sources, ok := in.Row["sources"].(map[string]any); ok { rec.RetrievedContext = buildRetrievedContext(map[string]any{ "matrix_corpora": sources["matrix_corpus"], "matrix_chunks_kept": sources["matrix_chunks_kept"], "matrix_chunks_dropped": sources["matrix_chunks_dropped"], "pathway_fingerprints_seen": sources["bug_fingerprints_count"], }) } return &rec } // scrumReviewsTransform: per-file scrum review traces. Success marker // captures the attempt number when accepted. func scrumReviewsTransform(in TransformInput) *distillation.EvidenceRecord { reviewedAt := getString(in.Row, "reviewed_at") tsMs := timeToMS(reviewedAt) file := getString(in.Row, "file") rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("scrum:%d:%s", tsMs, file), TaskID: fmt.Sprintf("scrum_review:%s", file), Timestamp: reviewedAt, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelName: getString(in.Row, "accepted_model"), ModelRole: distillation.RoleExecutor, Text: getString(in.Row, "suggestions_preview"), } if file != "" { rec.SourceFiles = []string{file} } if a, ok := numInt(in.Row, "accepted_on_attempt"); ok && a > 0 { rec.SuccessMarkers = []string{fmt.Sprintf("accepted_on_attempt_%d", a)} } return &rec } // observerEscalationsTransform: reviewer-class trace; carries token // counts so the SFT exporter sees real usage signals. func observerEscalationsTransform(in TransformInput) *distillation.EvidenceRecord { tsStr := getString(in.Row, "ts") tsMs := timeToMS(tsStr) rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("obs_esc:%d:%s", tsMs, getString(in.Row, "sig_hash")), TaskID: fmt.Sprintf("observer_escalation:%s", strDefault(in.Row, "cluster_endpoint", "?")), Timestamp: tsStr, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleReviewer, Text: getString(in.Row, "analysis"), } if pt, ok := numInt(in.Row, "prompt_tokens"); ok { rec.PromptTokens = pt } if ct, ok := numInt(in.Row, "completion_tokens"); ok { rec.CompletionTokens = ct } return &rec } // auditFactsTransform: per-PR auditor extraction. Text is a compact // JSON summary of array lengths (facts/entities/relationships). func auditFactsTransform(in TransformInput) *distillation.EvidenceRecord { headSHA := getString(in.Row, "head_sha") prNumber := getString(in.Row, "pr_number") body, _ := json.Marshal(map[string]any{ "facts": arrayLen(in.Row, "facts"), "entities": arrayLen(in.Row, "entities"), "relationships": arrayLen(in.Row, "relationships"), }) rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("audit_facts:%s:%d", headSHA, in.LineOffset), TaskID: fmt.Sprintf("pr:%s", prNumber), Timestamp: getString(in.Row, "extracted_at"), SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelName: getString(in.Row, "extractor"), ModelRole: distillation.RoleExtractor, Text: string(body), } return &rec } // autoApplyTransform: applier traces. Pure metadata — no text payload. // Deterministic ts fallback to RecordedAt when the row lacks one // (matches TS comment about wall-clock leak fix). func autoApplyTransform(in TransformInput) *distillation.EvidenceRecord { ts := getString(in.Row, "ts") if ts == "" { ts = in.RecordedAt } tsMs := timeToMS(ts) action := strDefault(in.Row, "action", "unknown") file := getString(in.Row, "file") keySuffix := file if keySuffix == "" { keySuffix = fmt.Sprintf("%d", in.LineOffset) } rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("auto_apply:%d:%s", tsMs, keySuffix), TaskID: fmt.Sprintf("auto_apply:%s", strDefault(in.Row, "file", "?")), Timestamp: ts, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleApplier, } if file != "" { rec.SourceFiles = []string{file} } if action == "committed" { rec.SuccessMarkers = []string{"committed"} } if strings.Contains(action, "reverted") { rec.FailureMarkers = []string{action} } return &rec } // observerReviewsTransform: reviewer-class. Falls back from `ts` to // `reviewed_at`. Mirrors observer_escalations but carries verdict + // confidence + free-form notes. func observerReviewsTransform(in TransformInput) *distillation.EvidenceRecord { ts := getString(in.Row, "ts") if ts == "" { ts = getString(in.Row, "reviewed_at") } tsMs := timeToMS(ts) file := getString(in.Row, "file") keySuffix := file if keySuffix == "" { keySuffix = fmt.Sprintf("%d", in.LineOffset) } taskID := fmt.Sprintf("observer_review:%s", keySuffix) if file == "" { taskID = fmt.Sprintf("observer_review:%d", in.LineOffset) } rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("obs_rev:%d:%s", tsMs, keySuffix), TaskID: taskID, Timestamp: ts, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleReviewer, } if v, ok := in.Row["verdict"].(string); ok && v != "" { rec.ObserverVerdict = distillation.ObserverVerdict(v) } if c, ok := numFloat(in.Row, "confidence"); ok { rec.ObserverConfidence = c } if notes := flattenNotes(in.Row, "notes"); len(notes) > 0 { rec.ObserverNotes = notes } if text := getString(in.Row, "notes"); text != "" { rec.Text = text } else if review := getString(in.Row, "review"); review != "" { rec.Text = review } return &rec } // auditsTransform: per-finding auditor stream. Severity drives the // success/failure marker shape — info/low → success, medium → // non-fatal failure, high/critical → blocking failure. // // Note on determinism: the TS port falls back to `new Date().toISOString()` // when `ts` is missing, which is non-deterministic. The Go port uses // RecordedAt as the deterministic fallback (matches the // auto_apply fix pattern). func auditsTransform(in TransformInput) *distillation.EvidenceRecord { sev := strings.ToLower(strDefault(in.Row, "severity", "unknown")) minor := sev == "info" || sev == "low" blocking := sev == "high" || sev == "critical" medium := sev == "medium" findingID := getString(in.Row, "finding_id") keySuffix := findingID if keySuffix == "" { keySuffix = fmt.Sprintf("%d", in.LineOffset) } phase := getString(in.Row, "phase") taskID := "audit_finding" if phase != "" { taskID = fmt.Sprintf("phase:%s", phase) } ts := getString(in.Row, "ts") if ts == "" { ts = in.RecordedAt } rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("audit_finding:%s", keySuffix), TaskID: taskID, Timestamp: ts, SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleReviewer, } if minor { rec.SuccessMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)} } if blocking { rec.FailureMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)} } else if medium { rec.FailureMarkers = []string{"audit_severity_medium"} } if ev, ok := in.Row["evidence"].(string); ok && ev != "" { rec.Text = ev } else { rec.Text = getString(in.Row, "resolution") } return &rec } // outcomesTransform: command-runner outcome stream. Latency from // elapsed_secs (× 1000), success when all events ok. func outcomesTransform(in TransformInput) *distillation.EvidenceRecord { rec := distillation.EvidenceRecord{ RunID: fmt.Sprintf("outcome:%s", strDefault(in.Row, "run_id", fmt.Sprintf("%d", in.LineOffset))), Timestamp: getString(in.Row, "created_at"), SchemaVersion: distillation.EvidenceSchemaVersion, Provenance: provenance(in), ModelRole: distillation.RoleExecutor, } if sigHash := getString(in.Row, "sig_hash"); sigHash != "" { rec.TaskID = fmt.Sprintf("outcome_sig:%s", sigHash) } else { rec.TaskID = fmt.Sprintf("outcome:%d", in.LineOffset) } if elapsed, ok := numFloat(in.Row, "elapsed_secs"); ok { rec.LatencyMs = int64(elapsed*1000 + 0.5) // rounded } if okEv, ok1 := numInt(in.Row, "ok_events"); ok1 { if total, ok2 := numInt(in.Row, "total_events"); ok2 { if total > 0 && okEv == total { rec.SuccessMarkers = []string{"all_events_ok"} } } } if g, ok := numInt(in.Row, "total_gap_signals"); ok { vr := map[string]any{"gap_signals": g} if c, ok2 := numInt(in.Row, "total_citations"); ok2 { vr["citation_count"] = c } rec.ValidationResults = vr } return &rec } // ─── Helpers — coercion + extraction patterns shared by transforms ── func provenance(in TransformInput) distillation.Provenance { return distillation.Provenance{ SourceFile: in.SourceFileRelPath, LineOffset: in.LineOffset, SigHash: in.SigHash, RecordedAt: in.RecordedAt, } } // stemFor extracts "distilled_facts" from "data/_kb/distilled_facts.jsonl". func stemFor(relpath string) string { idx := strings.LastIndex(relpath, "/") base := relpath if idx >= 0 { base = relpath[idx+1:] } return strings.TrimSuffix(base, ".jsonl") } // getString returns row[key] as a string, or "" if missing/wrong-type. func getString(row map[string]any, key string) string { v, ok := row[key] if !ok || v == nil { return "" } switch t := v.(type) { case string: return t case float64: return fmt.Sprintf("%v", t) case bool: return fmt.Sprintf("%t", t) default: return fmt.Sprintf("%v", t) } } // strDefault returns row[key] coerced to string, or fallback if empty/missing. func strDefault(row map[string]any, key, fallback string) string { if s := getString(row, key); s != "" { return s } return fallback } // numInt returns row[key] as int64. JSON numbers come in as float64. // Returns (val, true) when present and finite, else (0, false). func numInt(row map[string]any, key string) (int64, bool) { v, ok := row[key] if !ok || v == nil { return 0, false } switch t := v.(type) { case float64: return int64(t), true case int: return int64(t), true case int64: return t, true } return 0, false } // numFloat returns row[key] as float64. func numFloat(row map[string]any, key string) (float64, bool) { v, ok := row[key] if !ok || v == nil { return 0, false } switch t := v.(type) { case float64: return t, true case int: return float64(t), true case int64: return float64(t), true } return 0, false } // boolField returns (value, present). present=false when key missing // or non-bool. func boolField(row map[string]any, key string) (bool, bool) { v, ok := row[key] if !ok { return false, false } if b, isBool := v.(bool); isBool { return b, true } return false, false } // arrayLen returns len(row[key]) if it's an array, else 0. func arrayLen(row map[string]any, key string) int { if a, ok := row[key].([]any); ok { return len(a) } return 0 } // objectKeys returns sorted keys of row[key] when it's a map. Returns // nil when missing or non-map (so callers can treat empty corpus list // as "field absent"). func objectKeys(row map[string]any, key string) []string { m, ok := row[key].(map[string]any) if !ok || len(m) == 0 { return nil } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } // Sort for determinism — TS Object.keys() order is insertion-order // in modern engines but Go map iteration is randomized. sortInPlace(keys) return keys } // flattenNotes coerces row[key] from string OR []string into a clean // non-empty []string. TS form `[x].flat().filter(Boolean)` — Go does // it explicitly. func flattenNotes(row map[string]any, key string) []string { v, ok := row[key] if !ok || v == nil { return nil } switch t := v.(type) { case string: if t == "" { return nil } return []string{t} case []any: out := make([]string, 0, len(t)) for _, e := range t { if s, ok := e.(string); ok && s != "" { out = append(out, s) } } if len(out) == 0 { return nil } return out } return nil } // timeToMS parses an ISO 8601 string and returns milliseconds since // epoch, matching TS `new Date(iso).getTime()`. Returns 0 on parse // failure (matches TS NaN coerced to 0 by Number(...) in run_id paths, // although there it'd produce "NaN" — the Go behavior is more useful). func timeToMS(iso string) int64 { if iso == "" { return 0 } for _, layout := range []string{time.RFC3339Nano, time.RFC3339} { if t, err := time.Parse(layout, iso); err == nil { return t.UnixMilli() } } return 0 } // buildRetrievedContext assembles RetrievedContext from a flat map of // already-coerced fields. Returns nil when nothing meaningful is set, // so transforms can attach the field conditionally without wrapping // the call site. func buildRetrievedContext(fields map[string]any) *distillation.RetrievedContext { rc := distillation.RetrievedContext{} any := false if v, ok := fields["matrix_corpora"].([]string); ok && len(v) > 0 { rc.MatrixCorpora = v any = true } if v, ok := numFromAny(fields["matrix_hits"]); ok { rc.MatrixHits = int(v) any = true } if v, ok := numFromAny(fields["matrix_chunks_kept"]); ok { rc.MatrixChunksKept = int(v) any = true } if v, ok := numFromAny(fields["matrix_chunks_dropped"]); ok { rc.MatrixChunksDropped = int(v) any = true } if v, ok := numFromAny(fields["pathway_fingerprints_seen"]); ok { rc.PathwayFingerprintsSeen = int(v) any = true } if !any { return nil } return &rc } func numFromAny(v any) (float64, bool) { if v == nil { return 0, false } switch t := v.(type) { case float64: return t, true case int: return float64(t), true case int64: return float64(t), true } return 0, false } func sortInPlace(s []string) { // Tiny insertion sort — corpus lists are typically <10 entries. for i := 1; i < len(s); i++ { for j := i; j > 0 && s[j-1] > s[j]; j-- { s[j-1], s[j] = s[j], s[j-1] } } }