root 89ca72d471 materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.

1. **vectord substrate fix verified at original scale** (closes the
   2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
   scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
   Throughput dropped 1,115 → 438/sec because previously-broken
   scenarios now do real HNSW Add work — honest cost of correctness.
   The fix (i.vectors side-store + safeGraphAdd recover wrappers +
   smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
   footprint that originally surfaced the bug.

2. **Materializer port** — internal/materializer + cmd/materializer +
   scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
   (12 transforms) + build_evidence_index.ts (idempotency, day-partition,
   receipt). On-wire JSON shape matches TS so Bun and Go runs are
   interchangeable. 14 tests green.

3. **Replay port** — internal/replay + cmd/replay +
   scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
   (retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
   phase 7 live invocation on the Go side. Both runtimes append to the
   same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.

Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.

STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:31:02 -05:00

216 lines
5.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package replay
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
)
// tokenize lowercases and splits on non-[a-z0-9_] runs, keeping tokens
// of length ≥3. Matches replay.ts so retrieval scoring is consistent
// across runtimes.
func tokenize(text string) map[string]struct{} {
out := map[string]struct{}{}
if text == "" {
return out
}
lower := strings.ToLower(text)
var b strings.Builder
flush := func() {
if b.Len() >= 3 {
out[b.String()] = struct{}{}
}
b.Reset()
}
for _, r := range lower {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
b.WriteRune(r)
} else {
flush()
}
}
flush()
return out
}
// jaccard returns |A ∩ B| / |A B| over token sets.
func jaccard(a, b map[string]struct{}) float64 {
if len(a) == 0 || len(b) == 0 {
return 0
}
inter := 0
for t := range a {
if _, ok := b[t]; ok {
inter++
}
}
union := len(a) + len(b) - inter
if union == 0 {
return 0
}
return float64(inter) / float64(union)
}
// LoadRagCorpus reads `exports/rag/playbooks.jsonl` under root.
// Returns empty slice when the file is missing — callers fall back to
// a context-less prompt rather than failing.
func LoadRagCorpus(root string) ([]RagSample, error) {
path := filepath.Join(root, "exports", "rag", "playbooks.jsonl")
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
defer f.Close()
var corpus []RagSample
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var rec RagSample
if err := json.Unmarshal(line, &rec); err != nil {
continue // malformed line — skip, matches TS behavior
}
corpus = append(corpus, rec)
}
return corpus, sc.Err()
}
// retrieveRag returns up to topK playbooks with non-zero overlap.
// Sorted by score descending. Matches replay.ts.
func retrieveRag(corpus []RagSample, task string, topK int) []RetrievedArtifact {
taskTokens := tokenize(task)
type scored struct {
rec RagSample
score float64
}
all := make([]scored, 0, len(corpus))
for _, r := range corpus {
text := r.Title + " " + r.Content + " " + strings.Join(r.Tags, " ")
all = append(all, scored{rec: r, score: jaccard(taskTokens, tokenize(text))})
}
sort.SliceStable(all, func(i, j int) bool { return all[i].score > all[j].score })
out := make([]RetrievedArtifact, 0, topK)
for _, s := range all {
if len(out) >= topK {
break
}
if s.score <= 0 {
break
}
out = append(out, RetrievedArtifact{
RagID: s.rec.ID,
SourceRunID: s.rec.SourceRunID,
Title: s.rec.Title,
ContentPreview: trim(s.rec.Content, 240),
SuccessScore: s.rec.SuccessScore,
Tags: tagsOrEmpty(s.rec.Tags),
Score: s.score,
})
}
return out
}
var validationLineRE = regexp.MustCompile(`(?i)^[-*]\s*(verify|check|assert|confirm|ensure)\b|^\s*(verify|check|assert|confirm|ensure)\s`)
// extractValidationSteps pulls verify/check/assert/confirm/ensure
// lines from accepted samples. Used as a soft-anchor in the
// validation gate (response should touch at least one of these
// tokens) and surfaced into the prompt.
func extractValidationSteps(samples []RetrievedArtifact, corpus []RagSample) []string {
ids := map[string]struct{}{}
for _, s := range samples {
ids[s.RagID] = struct{}{}
}
var steps []string
for _, r := range corpus {
if _, ok := ids[r.ID]; !ok {
continue
}
for _, line := range strings.Split(r.Content, "\n") {
t := strings.TrimSpace(line)
if validationLineRE.MatchString(t) {
steps = append(steps, trim(t, 200))
if len(steps) >= 6 {
return steps
}
}
}
}
return steps
}
// BuildContextBundle assembles a ContextBundle from a corpus + task.
// Top 8 retrieved → split by success_score → at most 3 accepted, 2
// warnings → extract validation steps → estimate token cost.
func BuildContextBundle(corpus []RagSample, task string) *ContextBundle {
top := retrieveRag(corpus, task, 8)
accepted := filterByScore(top, "accepted", 3)
warnings := filterByScore(top, "partially_accepted", 2)
steps := extractValidationSteps(accepted, corpus)
totalChars := 0
for _, r := range accepted {
totalChars += len(r.ContentPreview) + len(r.Title)
}
for _, r := range warnings {
totalChars += len(r.ContentPreview) + len(r.Title)
}
for _, s := range steps {
totalChars += len(s)
}
tokenEstimate := (totalChars + 3) / 4 // ceil(chars/4)
return &ContextBundle{
RetrievedPlaybooks: top,
PriorSuccessfulOutputs: accepted,
FailurePatterns: warnings,
ValidationSteps: stepsOrEmpty(steps),
BundleTokenEstimate: tokenEstimate,
}
}
func filterByScore(arts []RetrievedArtifact, score string, max int) []RetrievedArtifact {
out := make([]RetrievedArtifact, 0, max)
for _, a := range arts {
if a.SuccessScore == score {
out = append(out, a)
if len(out) >= max {
break
}
}
}
return out
}
func tagsOrEmpty(t []string) []string {
if t == nil {
return []string{}
}
return t
}
func stepsOrEmpty(s []string) []string {
if s == nil {
return []string{}
}
return s
}
func trim(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}