root 89ca72d471 materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.

1. **vectord substrate fix verified at original scale** (closes the
   2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
   scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
   Throughput dropped 1,115 → 438/sec because previously-broken
   scenarios now do real HNSW Add work — honest cost of correctness.
   The fix (i.vectors side-store + safeGraphAdd recover wrappers +
   smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
   footprint that originally surfaced the bug.

2. **Materializer port** — internal/materializer + cmd/materializer +
   scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
   (12 transforms) + build_evidence_index.ts (idempotency, day-partition,
   receipt). On-wire JSON shape matches TS so Bun and Go runs are
   interchangeable. 14 tests green.

3. **Replay port** — internal/replay + cmd/replay +
   scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
   (retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
   phase 7 live invocation on the Go side. Both runtimes append to the
   same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.

Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.

STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:31:02 -05:00

654 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package materializer
import (
"encoding/json"
"fmt"
"strings"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
)
// TransformInput is what each TransformFn receives. Mirrors the TS
// TransformInput shape — every field is supplied by the materializer
// driver, not by the transform.
type TransformInput struct {
Row map[string]any
LineOffset int64
SourceFileRelPath string // relative to repo root
RecordedAt string // ISO 8601, caller's "now"
SigHash string // canonical sha256 of row, pre-computed
}
// TransformFn maps a single source row to an EvidenceRecord. Returning
// nil signals "skip this row" — the materializer logs a deterministic
// skip with no record produced.
//
// Transforms must be pure: no I/O, no clock reads, no model calls.
// Any time component must come from the row itself or RecordedAt.
type TransformFn func(in TransformInput) *distillation.EvidenceRecord
// TransformDef binds a source-file path to its TransformFn. Order in
// Transforms[] has no effect (each runs against its own SourceFile).
type TransformDef struct {
SourceFileRelPath string
Transform TransformFn
}
// ─── Transforms — one per source-file. Ports of TRANSFORMS[] in
// scripts/distillation/transforms.ts. Tier 1 first (validated), Tier 2
// second (untested but in-shape). ────────────────────────────────────
// Transforms is the canonical list. CLI passes this to MaterializeAll.
// Adding a new source: append a TransformDef.
var Transforms = []TransformDef{
// ── Tier 1: validated 100% in Phase 1 ─────────────────────────
{SourceFileRelPath: "data/_kb/distilled_facts.jsonl", Transform: extractorTransform},
{SourceFileRelPath: "data/_kb/distilled_procedures.jsonl", Transform: extractorTransform},
{SourceFileRelPath: "data/_kb/distilled_config_hints.jsonl", Transform: extractorTransform},
{SourceFileRelPath: "data/_kb/contract_analyses.jsonl", Transform: contractAnalysesTransform},
{SourceFileRelPath: "data/_kb/mode_experiments.jsonl", Transform: modeExperimentsTransform},
{SourceFileRelPath: "data/_kb/scrum_reviews.jsonl", Transform: scrumReviewsTransform},
{SourceFileRelPath: "data/_kb/observer_escalations.jsonl", Transform: observerEscalationsTransform},
{SourceFileRelPath: "data/_kb/audit_facts.jsonl", Transform: auditFactsTransform},
// ── Tier 2: untested streams that still belong in EvidenceRecord ──
{SourceFileRelPath: "data/_kb/auto_apply.jsonl", Transform: autoApplyTransform},
{SourceFileRelPath: "data/_kb/observer_reviews.jsonl", Transform: observerReviewsTransform},
{SourceFileRelPath: "data/_kb/audits.jsonl", Transform: auditsTransform},
{SourceFileRelPath: "data/_kb/outcomes.jsonl", Transform: outcomesTransform},
}
// TransformByPath returns the TransformDef for a given source path,
// or nil if no transform is registered. Matches the TS helper.
func TransformByPath(relpath string) *TransformDef {
for i := range Transforms {
if Transforms[i].SourceFileRelPath == relpath {
return &Transforms[i]
}
}
return nil
}
// ─── Per-source transform implementations ─────────────────────────
// extractorTransform powers the three distilled_* sources. Same shape:
// LLM-extracted text with a model_name from `extractor`.
func extractorTransform(in TransformInput) *distillation.EvidenceRecord {
stem := stemFor(in.SourceFileRelPath)
rec := distillation.EvidenceRecord{
RunID: strDefault(in.Row, "run_id", fmt.Sprintf("%s:%d", stem, in.LineOffset)),
TaskID: strDefault(in.Row, "source_label", fmt.Sprintf("%s:%d", stem, in.LineOffset)),
Timestamp: getString(in.Row, "created_at"),
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelName: getString(in.Row, "extractor"),
ModelRole: distillation.RoleExtractor,
ModelProvider: "ollama",
Text: getString(in.Row, "text"),
}
return &rec
}
// contractAnalysesTransform: per-permit executor with observer signals,
// retrieval telemetry, and cost in micro-units that gets converted to
// USD. Carries `contractor` in metadata.
func contractAnalysesTransform(in TransformInput) *distillation.EvidenceRecord {
permitID := getString(in.Row, "permit_id")
tsStr := getString(in.Row, "ts")
tsMs := timeToMS(tsStr)
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("contract_analysis:%s:%d", permitID, tsMs),
TaskID: fmt.Sprintf("permit:%s", permitID),
Timestamp: tsStr,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleExecutor,
Text: getString(in.Row, "analysis"),
}
if rc := buildRetrievedContext(map[string]any{
"matrix_corpora": objectKeys(in.Row, "matrix_corpora"),
"matrix_hits": in.Row["matrix_hits"],
}); rc != nil {
rec.RetrievedContext = rc
}
if notes := flattenNotes(in.Row, "observer_notes"); len(notes) > 0 {
rec.ObserverNotes = notes
}
if v, ok := in.Row["observer_verdict"].(string); ok && v != "" {
rec.ObserverVerdict = distillation.ObserverVerdict(v)
}
if c, ok := numFloat(in.Row, "observer_conf"); ok {
rec.ObserverConfidence = c
}
if ok, present := boolField(in.Row, "ok"); present && ok {
rec.SuccessMarkers = []string{"matrix_hits_above_threshold"}
}
verdict := getString(in.Row, "observer_verdict")
okPresent, _ := boolField(in.Row, "ok")
if !okPresent || verdict == "reject" {
rec.FailureMarkers = []string{"observer_rejected"}
}
if cost, ok := numFloat(in.Row, "cost"); ok {
rec.CostUSD = cost / 1_000_000.0
}
if d, ok := numInt(in.Row, "duration_ms"); ok {
rec.LatencyMs = d
}
if contractor := getString(in.Row, "contractor"); contractor != "" {
rec.Metadata = map[string]any{"contractor": contractor}
}
return &rec
}
// modeExperimentsTransform: mode_runner per-call traces. Provider
// derived from model name shape ("/" → openrouter, else ollama_cloud).
func modeExperimentsTransform(in TransformInput) *distillation.EvidenceRecord {
tsStr := getString(in.Row, "ts")
tsMs := timeToMS(tsStr)
filePath := getString(in.Row, "file_path")
keySuffix := filePath
if keySuffix == "" {
keySuffix = fmt.Sprintf("%d", in.LineOffset)
}
model := getString(in.Row, "model")
provider := "ollama_cloud"
if strings.Contains(model, "/") {
provider = "openrouter"
}
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("mode_exec:%d:%s", tsMs, keySuffix),
TaskID: getString(in.Row, "task_class"),
Timestamp: tsStr,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelName: model,
ModelRole: distillation.RoleExecutor,
ModelProvider: provider,
Text: getString(in.Row, "response"),
}
if d, ok := numInt(in.Row, "latency_ms"); ok {
rec.LatencyMs = d
}
if filePath != "" {
rec.SourceFiles = []string{filePath}
}
if sources, ok := in.Row["sources"].(map[string]any); ok {
rec.RetrievedContext = buildRetrievedContext(map[string]any{
"matrix_corpora": sources["matrix_corpus"],
"matrix_chunks_kept": sources["matrix_chunks_kept"],
"matrix_chunks_dropped": sources["matrix_chunks_dropped"],
"pathway_fingerprints_seen": sources["bug_fingerprints_count"],
})
}
return &rec
}
// scrumReviewsTransform: per-file scrum review traces. Success marker
// captures the attempt number when accepted.
func scrumReviewsTransform(in TransformInput) *distillation.EvidenceRecord {
reviewedAt := getString(in.Row, "reviewed_at")
tsMs := timeToMS(reviewedAt)
file := getString(in.Row, "file")
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("scrum:%d:%s", tsMs, file),
TaskID: fmt.Sprintf("scrum_review:%s", file),
Timestamp: reviewedAt,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelName: getString(in.Row, "accepted_model"),
ModelRole: distillation.RoleExecutor,
Text: getString(in.Row, "suggestions_preview"),
}
if file != "" {
rec.SourceFiles = []string{file}
}
if a, ok := numInt(in.Row, "accepted_on_attempt"); ok && a > 0 {
rec.SuccessMarkers = []string{fmt.Sprintf("accepted_on_attempt_%d", a)}
}
return &rec
}
// observerEscalationsTransform: reviewer-class trace; carries token
// counts so the SFT exporter sees real usage signals.
func observerEscalationsTransform(in TransformInput) *distillation.EvidenceRecord {
tsStr := getString(in.Row, "ts")
tsMs := timeToMS(tsStr)
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("obs_esc:%d:%s", tsMs, getString(in.Row, "sig_hash")),
TaskID: fmt.Sprintf("observer_escalation:%s", strDefault(in.Row, "cluster_endpoint", "?")),
Timestamp: tsStr,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleReviewer,
Text: getString(in.Row, "analysis"),
}
if pt, ok := numInt(in.Row, "prompt_tokens"); ok {
rec.PromptTokens = pt
}
if ct, ok := numInt(in.Row, "completion_tokens"); ok {
rec.CompletionTokens = ct
}
return &rec
}
// auditFactsTransform: per-PR auditor extraction. Text is a compact
// JSON summary of array lengths (facts/entities/relationships).
func auditFactsTransform(in TransformInput) *distillation.EvidenceRecord {
headSHA := getString(in.Row, "head_sha")
prNumber := getString(in.Row, "pr_number")
body, _ := json.Marshal(map[string]any{
"facts": arrayLen(in.Row, "facts"),
"entities": arrayLen(in.Row, "entities"),
"relationships": arrayLen(in.Row, "relationships"),
})
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("audit_facts:%s:%d", headSHA, in.LineOffset),
TaskID: fmt.Sprintf("pr:%s", prNumber),
Timestamp: getString(in.Row, "extracted_at"),
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelName: getString(in.Row, "extractor"),
ModelRole: distillation.RoleExtractor,
Text: string(body),
}
return &rec
}
// autoApplyTransform: applier traces. Pure metadata — no text payload.
// Deterministic ts fallback to RecordedAt when the row lacks one
// (matches TS comment about wall-clock leak fix).
func autoApplyTransform(in TransformInput) *distillation.EvidenceRecord {
ts := getString(in.Row, "ts")
if ts == "" {
ts = in.RecordedAt
}
tsMs := timeToMS(ts)
action := strDefault(in.Row, "action", "unknown")
file := getString(in.Row, "file")
keySuffix := file
if keySuffix == "" {
keySuffix = fmt.Sprintf("%d", in.LineOffset)
}
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("auto_apply:%d:%s", tsMs, keySuffix),
TaskID: fmt.Sprintf("auto_apply:%s", strDefault(in.Row, "file", "?")),
Timestamp: ts,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleApplier,
}
if file != "" {
rec.SourceFiles = []string{file}
}
if action == "committed" {
rec.SuccessMarkers = []string{"committed"}
}
if strings.Contains(action, "reverted") {
rec.FailureMarkers = []string{action}
}
return &rec
}
// observerReviewsTransform: reviewer-class. Falls back from `ts` to
// `reviewed_at`. Mirrors observer_escalations but carries verdict +
// confidence + free-form notes.
func observerReviewsTransform(in TransformInput) *distillation.EvidenceRecord {
ts := getString(in.Row, "ts")
if ts == "" {
ts = getString(in.Row, "reviewed_at")
}
tsMs := timeToMS(ts)
file := getString(in.Row, "file")
keySuffix := file
if keySuffix == "" {
keySuffix = fmt.Sprintf("%d", in.LineOffset)
}
taskID := fmt.Sprintf("observer_review:%s", keySuffix)
if file == "" {
taskID = fmt.Sprintf("observer_review:%d", in.LineOffset)
}
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("obs_rev:%d:%s", tsMs, keySuffix),
TaskID: taskID,
Timestamp: ts,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleReviewer,
}
if v, ok := in.Row["verdict"].(string); ok && v != "" {
rec.ObserverVerdict = distillation.ObserverVerdict(v)
}
if c, ok := numFloat(in.Row, "confidence"); ok {
rec.ObserverConfidence = c
}
if notes := flattenNotes(in.Row, "notes"); len(notes) > 0 {
rec.ObserverNotes = notes
}
if text := getString(in.Row, "notes"); text != "" {
rec.Text = text
} else if review := getString(in.Row, "review"); review != "" {
rec.Text = review
}
return &rec
}
// auditsTransform: per-finding auditor stream. Severity drives the
// success/failure marker shape — info/low → success, medium →
// non-fatal failure, high/critical → blocking failure.
//
// Note on determinism: the TS port falls back to `new Date().toISOString()`
// when `ts` is missing, which is non-deterministic. The Go port uses
// RecordedAt as the deterministic fallback (matches the
// auto_apply fix pattern).
func auditsTransform(in TransformInput) *distillation.EvidenceRecord {
sev := strings.ToLower(strDefault(in.Row, "severity", "unknown"))
minor := sev == "info" || sev == "low"
blocking := sev == "high" || sev == "critical"
medium := sev == "medium"
findingID := getString(in.Row, "finding_id")
keySuffix := findingID
if keySuffix == "" {
keySuffix = fmt.Sprintf("%d", in.LineOffset)
}
phase := getString(in.Row, "phase")
taskID := "audit_finding"
if phase != "" {
taskID = fmt.Sprintf("phase:%s", phase)
}
ts := getString(in.Row, "ts")
if ts == "" {
ts = in.RecordedAt
}
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("audit_finding:%s", keySuffix),
TaskID: taskID,
Timestamp: ts,
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleReviewer,
}
if minor {
rec.SuccessMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)}
}
if blocking {
rec.FailureMarkers = []string{fmt.Sprintf("audit_severity_%s", sev)}
} else if medium {
rec.FailureMarkers = []string{"audit_severity_medium"}
}
if ev, ok := in.Row["evidence"].(string); ok && ev != "" {
rec.Text = ev
} else {
rec.Text = getString(in.Row, "resolution")
}
return &rec
}
// outcomesTransform: command-runner outcome stream. Latency from
// elapsed_secs (× 1000), success when all events ok.
func outcomesTransform(in TransformInput) *distillation.EvidenceRecord {
rec := distillation.EvidenceRecord{
RunID: fmt.Sprintf("outcome:%s", strDefault(in.Row, "run_id", fmt.Sprintf("%d", in.LineOffset))),
Timestamp: getString(in.Row, "created_at"),
SchemaVersion: distillation.EvidenceSchemaVersion,
Provenance: provenance(in),
ModelRole: distillation.RoleExecutor,
}
if sigHash := getString(in.Row, "sig_hash"); sigHash != "" {
rec.TaskID = fmt.Sprintf("outcome_sig:%s", sigHash)
} else {
rec.TaskID = fmt.Sprintf("outcome:%d", in.LineOffset)
}
if elapsed, ok := numFloat(in.Row, "elapsed_secs"); ok {
rec.LatencyMs = int64(elapsed*1000 + 0.5) // rounded
}
if okEv, ok1 := numInt(in.Row, "ok_events"); ok1 {
if total, ok2 := numInt(in.Row, "total_events"); ok2 {
if total > 0 && okEv == total {
rec.SuccessMarkers = []string{"all_events_ok"}
}
}
}
if g, ok := numInt(in.Row, "total_gap_signals"); ok {
vr := map[string]any{"gap_signals": g}
if c, ok2 := numInt(in.Row, "total_citations"); ok2 {
vr["citation_count"] = c
}
rec.ValidationResults = vr
}
return &rec
}
// ─── Helpers — coercion + extraction patterns shared by transforms ──
func provenance(in TransformInput) distillation.Provenance {
return distillation.Provenance{
SourceFile: in.SourceFileRelPath,
LineOffset: in.LineOffset,
SigHash: in.SigHash,
RecordedAt: in.RecordedAt,
}
}
// stemFor extracts "distilled_facts" from "data/_kb/distilled_facts.jsonl".
func stemFor(relpath string) string {
idx := strings.LastIndex(relpath, "/")
base := relpath
if idx >= 0 {
base = relpath[idx+1:]
}
return strings.TrimSuffix(base, ".jsonl")
}
// getString returns row[key] as a string, or "" if missing/wrong-type.
func getString(row map[string]any, key string) string {
v, ok := row[key]
if !ok || v == nil {
return ""
}
switch t := v.(type) {
case string:
return t
case float64:
return fmt.Sprintf("%v", t)
case bool:
return fmt.Sprintf("%t", t)
default:
return fmt.Sprintf("%v", t)
}
}
// strDefault returns row[key] coerced to string, or fallback if empty/missing.
func strDefault(row map[string]any, key, fallback string) string {
if s := getString(row, key); s != "" {
return s
}
return fallback
}
// numInt returns row[key] as int64. JSON numbers come in as float64.
// Returns (val, true) when present and finite, else (0, false).
func numInt(row map[string]any, key string) (int64, bool) {
v, ok := row[key]
if !ok || v == nil {
return 0, false
}
switch t := v.(type) {
case float64:
return int64(t), true
case int:
return int64(t), true
case int64:
return t, true
}
return 0, false
}
// numFloat returns row[key] as float64.
func numFloat(row map[string]any, key string) (float64, bool) {
v, ok := row[key]
if !ok || v == nil {
return 0, false
}
switch t := v.(type) {
case float64:
return t, true
case int:
return float64(t), true
case int64:
return float64(t), true
}
return 0, false
}
// boolField returns (value, present). present=false when key missing
// or non-bool.
func boolField(row map[string]any, key string) (bool, bool) {
v, ok := row[key]
if !ok {
return false, false
}
if b, isBool := v.(bool); isBool {
return b, true
}
return false, false
}
// arrayLen returns len(row[key]) if it's an array, else 0.
func arrayLen(row map[string]any, key string) int {
if a, ok := row[key].([]any); ok {
return len(a)
}
return 0
}
// objectKeys returns sorted keys of row[key] when it's a map. Returns
// nil when missing or non-map (so callers can treat empty corpus list
// as "field absent").
func objectKeys(row map[string]any, key string) []string {
m, ok := row[key].(map[string]any)
if !ok || len(m) == 0 {
return nil
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
// Sort for determinism — TS Object.keys() order is insertion-order
// in modern engines but Go map iteration is randomized.
sortInPlace(keys)
return keys
}
// flattenNotes coerces row[key] from string OR []string into a clean
// non-empty []string. TS form `[x].flat().filter(Boolean)` — Go does
// it explicitly.
func flattenNotes(row map[string]any, key string) []string {
v, ok := row[key]
if !ok || v == nil {
return nil
}
switch t := v.(type) {
case string:
if t == "" {
return nil
}
return []string{t}
case []any:
out := make([]string, 0, len(t))
for _, e := range t {
if s, ok := e.(string); ok && s != "" {
out = append(out, s)
}
}
if len(out) == 0 {
return nil
}
return out
}
return nil
}
// timeToMS parses an ISO 8601 string and returns milliseconds since
// epoch, matching TS `new Date(iso).getTime()`. Returns 0 on parse
// failure (matches TS NaN coerced to 0 by Number(...) in run_id paths,
// although there it'd produce "NaN" — the Go behavior is more useful).
func timeToMS(iso string) int64 {
if iso == "" {
return 0
}
for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
if t, err := time.Parse(layout, iso); err == nil {
return t.UnixMilli()
}
}
return 0
}
// buildRetrievedContext assembles RetrievedContext from a flat map of
// already-coerced fields. Returns nil when nothing meaningful is set,
// so transforms can attach the field conditionally without wrapping
// the call site.
func buildRetrievedContext(fields map[string]any) *distillation.RetrievedContext {
rc := distillation.RetrievedContext{}
any := false
if v, ok := fields["matrix_corpora"].([]string); ok && len(v) > 0 {
rc.MatrixCorpora = v
any = true
}
if v, ok := numFromAny(fields["matrix_hits"]); ok {
rc.MatrixHits = int(v)
any = true
}
if v, ok := numFromAny(fields["matrix_chunks_kept"]); ok {
rc.MatrixChunksKept = int(v)
any = true
}
if v, ok := numFromAny(fields["matrix_chunks_dropped"]); ok {
rc.MatrixChunksDropped = int(v)
any = true
}
if v, ok := numFromAny(fields["pathway_fingerprints_seen"]); ok {
rc.PathwayFingerprintsSeen = int(v)
any = true
}
if !any {
return nil
}
return &rc
}
func numFromAny(v any) (float64, bool) {
if v == nil {
return 0, false
}
switch t := v.(type) {
case float64:
return t, true
case int:
return float64(t), true
case int64:
return float64(t), true
}
return 0, false
}
func sortInPlace(s []string) {
// Tiny insertion sort — corpus lists are typically <10 entries.
for i := 1; i < len(s); i++ {
for j := i; j > 0 && s[j-1] > s[j]; j-- {
s[j-1], s[j] = s[j], s[j-1]
}
}
}